From 6cad8c5e45c1713392df7e525817772a8cca2749 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 20 Apr 2017 10:03:00 +0200 Subject: [PATCH 1/5] wip --- .../elasticsearch/index/engine/EngineConfig.java | 14 ++++---------- .../index/engine/InternalEngine.java | 14 +++++++++----- .../elasticsearch/index/shard/IndexShard.java | 16 +++++++--------- .../elasticsearch/index/shard/StoreRecovery.java | 5 ++--- ...overyPrepareForTranslogOperationsRequest.java | 15 +++++---------- .../indices/recovery/RecoverySourceHandler.java | 6 +++--- .../indices/recovery/RecoveryTarget.java | 8 ++------ .../indices/recovery/RecoveryTargetHandler.java | 4 +--- .../recovery/RemoteRecoveryTargetHandler.java | 4 ++-- 9 files changed, 35 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 7852d2c2db089..d22a93273c7e1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -27,7 +27,6 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.Sort; import org.apache.lucene.search.similarities.Similarity; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; @@ -67,7 +66,6 @@ public final class EngineConfig { private final Engine.EventListener eventListener; private final QueryCache queryCache; private final QueryCachingPolicy queryCachingPolicy; - private final long maxUnsafeAutoIdTimestamp; @Nullable private final ReferenceManager.RefreshListener refreshListeners; @Nullable @@ -116,7 +114,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, Similarity similarity, CodecService codecService, Engine.EventListener eventListener, TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners, - long maxUnsafeAutoIdTimestamp, Sort indexSort) { + Sort indexSort) { if (openMode == null) { throw new IllegalArgumentException("openMode must not be null"); } @@ -143,9 +141,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool, this.flushMergesAfter = flushMergesAfter; this.openMode = openMode; this.refreshListeners = refreshListeners; - assert maxUnsafeAutoIdTimestamp >= IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP : - "maxUnsafeAutoIdTimestamp must be >= -1 but was " + maxUnsafeAutoIdTimestamp; - this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp; this.indexSort = indexSort; } @@ -333,11 +328,10 @@ public ReferenceManager.RefreshListener getRefreshListeners() { } /** - * Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine. - * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs + * returns true if the engine is allowed to optimize indexing operations with an auto-generated ID */ - public long getMaxUnsafeAutoIdTimestamp() { - return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS) ? maxUnsafeAutoIdTimestamp : Long.MAX_VALUE; + public boolean isAutoGeneratedIDsOptimizationEnabled() { + return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS); } /** diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 3e5d3453cacf9..0a76eff4fa89a 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -136,11 +136,8 @@ public class InternalEngine extends Engine { public InternalEngine(EngineConfig engineConfig) throws EngineException { super(engineConfig); openMode = engineConfig.getOpenMode(); - if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_5_0_0_beta1)) { - // no optimization for pre 5.0.0.alpha6 since translog might not have all information needed + if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); - } else { - maxUnsafeAutoIdTimestamp.set(engineConfig.getMaxUnsafeAutoIdTimestamp()); } this.versionMap = new LiveVersionMap(); store.incRef(); @@ -236,6 +233,13 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { break; } } + assert engineConfig.getOpenMode() != EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG || commitMaxUnsafeAutoIdTimestamp >= -1L : + "recovering from a remote primary but max_unsafe_auto_id_timestamp is not found in commit"; + assert (engineConfig.getOpenMode() == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG && + engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) == false + || commitMaxUnsafeAutoIdTimestamp >= -1L : + "recovering from a local store but max_unsafe_auto_id_timestamp is not found in commit"; + lsdajfalsk;djf deal with recovery from store maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); } @@ -1836,7 +1840,7 @@ public void onSettingsChanged() { mergeScheduler.refreshConfig(); // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed: maybePruneDeletedTombstones(); - if (engineConfig.getMaxUnsafeAutoIdTimestamp() == Long.MAX_VALUE) { + if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) { // this is an anti-viral settings you can only opt out for the entire index // only if a shard starts up again due to relocation or if the index is closed // the setting will be re-interpreted if it's set to true diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d1ca4f13a42da..6984af8b900b4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -42,7 +42,6 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -1040,11 +1039,11 @@ public void performTranslogRecovery(boolean indexExists) throws IOException { translogStats.totalOperations(0); translogStats.totalOperationsOnStart(0); } - internalPerformTranslogRecovery(false, indexExists, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + internalPerformTranslogRecovery(false, indexExists); assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage(); } - private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException { + private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException { if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); } @@ -1073,7 +1072,7 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole } else { openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; } - final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp); + final EngineConfig config = newEngineConfig(openMode); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering config.setEnableGcDeletes(false); @@ -1096,9 +1095,9 @@ protected void onNewEngine(Engine newEngine) { * the replay of the transaction log which is required in cases where we restore a previous index or recover from * a remote peer. */ - public void skipTranslogRecovery(long maxUnsafeAutoIdTimestamp) throws IOException { + public void skipTranslogRecovery() throws IOException { assert getEngineOrNull() == null : "engine was already created"; - internalPerformTranslogRecovery(true, true, maxUnsafeAutoIdTimestamp); + internalPerformTranslogRecovery(true, true); assert recoveryState.getTranslog().recoveredOperations() == 0; } @@ -1795,14 +1794,13 @@ private DocumentMapperForType docMapper(String type) { return mapperService.documentMapperWithAutoCreate(type); } - private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, long maxUnsafeAutoIdTimestamp) { + private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) { final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger); Sort indexSort = indexSortSupplier.get(); return new EngineConfig(openMode, shardId, threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(), mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, - IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, - maxUnsafeAutoIdTimestamp, indexSort); + IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort); } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 6cfaca8c45b4b..032c033f175b1 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -31,7 +31,6 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.RecoverySource; @@ -353,7 +352,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe recoveryState.getIndex().updateVersion(version); if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { assert indexShouldExists; - indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + indexShard.skipTranslogRecovery(); } else { // since we recover from local, just fill the files and size try { @@ -405,7 +404,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f } final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName); repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState()); - indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); + indexShard.skipTranslogRecovery(); indexShard.finalizeRecovery(); indexShard.postRecovery("restore done"); } catch (Exception e) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java index 94425f627994c..e11233e08710d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java @@ -19,7 +19,7 @@ package org.elasticsearch.indices.recovery; -import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -29,7 +29,6 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest { - private long maxUnsafeAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; private long recoveryId; private ShardId shardId; private int totalTranslogOps = RecoveryState.Translog.UNKNOWN; @@ -37,11 +36,10 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques public RecoveryPrepareForTranslogOperationsRequest() { } - RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, long maxUnsafeAutoIdTimestamp) { + RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) { this.recoveryId = recoveryId; this.shardId = shardId; this.totalTranslogOps = totalTranslogOps; - this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp; } public long recoveryId() { @@ -56,17 +54,15 @@ public int totalTranslogOps() { return totalTranslogOps; } - public long getMaxUnsafeAutoIdTimestamp() { - return maxUnsafeAutoIdTimestamp; - } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); totalTranslogOps = in.readVInt(); - maxUnsafeAutoIdTimestamp = in.readLong(); + if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { + in.readLong(); // maxUnsafeAutoIdTimestamp + } } @Override @@ -75,6 +71,5 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(recoveryId); shardId.writeTo(out); out.writeVInt(totalTranslogOps); - out.writeLong(maxUnsafeAutoIdTimestamp); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index a6aa47492e1c4..40f9f7f74f895 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -157,7 +157,7 @@ public RecoveryResponse recoverToTarget() throws IOException { } try { - prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp()); + prepareTargetForTranslog(translogView.totalOperations()); } catch (final Exception e) { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } @@ -389,13 +389,13 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView) } } - void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException { + void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { StopWatch stopWatch = new StopWatch().start(); logger.trace("recovery [phase1]: prepare remote engine for translog"); final long startEngineStart = stopWatch.totalTime().millis(); // Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables // garbage collection (not the JVM's GC!) of tombstone deletes. - cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp)); + cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps)); stopWatch.stop(); response.startTime = stopWatch.totalTime().millis() - startEngineStart; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index d9886efa07b21..b12006bbd3ce5 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -36,7 +36,6 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRefCounted; @@ -49,7 +48,6 @@ import org.elasticsearch.index.translog.Translog; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -58,8 +56,6 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -360,9 +356,9 @@ private void ensureRefCount() { /*** Implementation of {@link RecoveryTargetHandler } */ @Override - public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { state().getTranslog().totalOperations(totalTranslogOps); - indexShard().skipTranslogRecovery(maxUnsafeAutoIdTimestamp); + indexShard().skipTranslogRecovery(); } @Override diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 831181c631159..bdace02d218ba 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -33,10 +33,8 @@ public interface RecoveryTargetHandler { * Prepares the target to receive translog operations, after all file have been copied * * @param totalTranslogOps total translog operations expected to be sent - * @param maxUnsafeAutoIdTimestamp the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine. - * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs */ - void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException; + void prepareForTranslogOperations(int totalTranslogOps) throws IOException; /** * The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 5fa1ca22c7065..959522d297db2 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -78,9 +78,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, String targ } @Override - public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, - new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, maxUnsafeAutoIdTimestamp), + new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } From e8789f5a6cdbaefe06b3b093dbfe3d9706f61ac0 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 21 Apr 2017 08:58:54 +0200 Subject: [PATCH 2/5] fix tests --- .../index/engine/InternalEngine.java | 9 +- .../elasticsearch/index/shard/IndexShard.java | 22 +++++ .../recovery/PeerRecoveryTargetService.java | 2 +- .../index/engine/InternalEngineTests.java | 95 ++++++++----------- .../RecoveryDuringReplicationTests.java | 6 +- .../index/shard/IndexShardTests.java | 4 +- .../index/shard/RefreshListenersTests.java | 3 +- .../recovery/RecoverySourceHandlerTests.java | 4 +- 8 files changed, 71 insertions(+), 74 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0a76eff4fa89a..8e1eb059caa5b 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -128,7 +128,7 @@ public class InternalEngine extends Engine { private final AtomicInteger throttleRequestCount = new AtomicInteger(); private final EngineConfig.OpenMode openMode; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); - private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; + public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp"; private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); @@ -233,13 +233,6 @@ private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) { break; } } - assert engineConfig.getOpenMode() != EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG || commitMaxUnsafeAutoIdTimestamp >= -1L : - "recovering from a remote primary but max_unsafe_auto_id_timestamp is not found in commit"; - assert (engineConfig.getOpenMode() == EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG && - engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) == false - || commitMaxUnsafeAutoIdTimestamp >= -1L : - "recovering from a local store but max_unsafe_auto_id_timestamp is not found in commit"; - lsdajfalsk;djf deal with recovery from store maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp)); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 6984af8b900b4..dd2c99fc12ff6 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -27,6 +27,7 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy; +import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.search.Query; @@ -38,6 +39,7 @@ import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.ThreadInterruptedException; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest; @@ -78,6 +80,7 @@ import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.Segment; @@ -1072,6 +1075,25 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole } else { openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; } + + boolean assertionsEnabled = false; + assert assertionsEnabled = true; + // TODO: add this for shrinked indices. + if (assertionsEnabled) { + final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); + if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.PEER) { + // as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point. + // This should have baked into the commit by the primary we recover from, regardless of the index age. + assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : + "recovery from remote but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit"; + } else if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE && + indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) { + assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : + "opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + + " is not found in commit"; + } + } + final EngineConfig config = newEngineConfig(openMode); // we disable deletes since we allow for operations to be executed against the shard while recovering // but we need to make sure we don't loose deletes until we are done recovering diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index a93cdd51e3842..f449f9ffbe42d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -377,7 +377,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(), request.getMaxUnsafeAutoIdTimestamp()); + recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a5bdf5c39641c..41adb2489d737 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -56,10 +56,10 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.search.TermQuery; -import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortedSetSortField; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; @@ -262,7 +262,7 @@ public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, An config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), analyzer, config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(), - config.getMaxUnsafeAutoIdTimestamp(), config.getIndexSort()); + config.getIndexSort()); } @Override @@ -371,7 +371,7 @@ protected InternalEngine createEngine( @Nullable IndexWriterFactory indexWriterFactory, @Nullable Supplier sequenceNumbersServiceSupplier, @Nullable Sort indexSort) throws IOException { - EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null, indexSort); + EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort); InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, config); if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { internalEngine.recoverFromTranslog(); @@ -404,25 +404,22 @@ public SequenceNumbersService seqNoService() { } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), - maxUnsafeAutoIdTimestamp, refreshListener, null); + ReferenceManager.RefreshListener refreshListener) { + return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, null); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener, Sort indexSort) { - return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), - maxUnsafeAutoIdTimestamp, refreshListener, indexSort); + ReferenceManager.RefreshListener refreshListener, Sort indexSort) { + return config(indexSettings, store, translogPath, mergePolicy, createSnapshotDeletionPolicy(), refreshListener, indexSort); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp, - ReferenceManager.RefreshListener refreshListener) { - return config(indexSettings, store, translogPath, mergePolicy, deletionPolicy, maxUnsafeAutoIdTimestamp, refreshListener, null); + SnapshotDeletionPolicy deletionPolicy, ReferenceManager.RefreshListener refreshListener) { + return config(indexSettings, store, translogPath, mergePolicy, deletionPolicy, refreshListener, null); } public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, - SnapshotDeletionPolicy deletionPolicy, long maxUnsafeAutoIdTimestamp, + SnapshotDeletionPolicy deletionPolicy, ReferenceManager.RefreshListener refreshListener, Sort indexSort) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); @@ -445,8 +442,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, deletionPolicy, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, new TranslogHandler(xContentRegistry(), shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, - maxUnsafeAutoIdTimestamp, indexSort); + IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener, indexSort); return config; } @@ -1170,8 +1166,7 @@ public void testSearchResultRelease() throws Exception { public void testSyncedFlush() throws IOException { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null); engine.index(indexForDoc(doc)); @@ -1198,7 +1193,7 @@ public void testRenewSyncFlush() throws Exception { for (int i = 0; i < iters; i++) { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + new LogDocMergePolicy(), null))) { final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20); Engine.Index doc1 = indexForDoc(testParsedDocument("1", "test", null, testDocumentWithTextField(), B_1, null)); engine.index(doc1); @@ -1317,7 +1312,7 @@ public void testVersioningNewIndex() throws IOException { public void testForceMerge() throws IOException { try (Store store = createStore(); Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), - new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { // use log MP here we test some behavior in ESMP + new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP int numDocs = randomIntBetween(10, 100); for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), "test", null, testDocument(), B_1, null); @@ -2132,8 +2127,7 @@ public void testSeqNoAndCheckpoints() throws IOException { public void testConcurrentWritesAndCommits() throws Exception { try (Store store = createStore(); InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), - new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + new SnapshotDeletionPolicy(NoDeletionPolicy.INSTANCE), null))) { final int numIndexingThreads = scaledRandomIntBetween(3, 6); final int numDocsPerThread = randomIntBetween(500, 1000); @@ -2274,7 +2268,7 @@ public void testIndexWriterIFDInfoStream() throws IllegalAccessException, IOExce public void testEnableGcDeletes() throws Exception { try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), null))) { engine.config().setEnableGcDeletes(false); // Add document @@ -2421,7 +2415,8 @@ public void testMissingTranslog() throws IOException { // expected } // now it should be OK. - EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); + EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), null), + EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG); engine = new InternalEngine(config); } @@ -2736,7 +2731,7 @@ public void testRecoverFromForeignTranslog() throws IOException { config.getIndexSettings(), null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getAnalyzer(), config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), config.getRefreshListeners(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); + TimeValue.timeValueMinutes(5), config.getRefreshListeners(), null); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); @@ -2788,7 +2783,7 @@ public void run() { public void testCurrentTranslogIDisCommitted() throws IOException { try (Store store = createStore()) { - EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); + EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null); // create { @@ -3275,47 +3270,36 @@ public void testRetryConcurrently() throws InterruptedException, IOException { } public void testEngineMaxTimestampIsInitialized() throws IOException { - try (Store store = createStore(); - Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { - assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - - } - final long timestamp1 = Math.abs(randomLong()); + final long timestamp1 = Math.abs(randomNonNegativeLong()); final Path storeDir = createTempDir(); final Path translogDir = createTempDir(); + final long timestamp2 = randomNonNegativeLong(); + final long maxTimestamp12 = Math.max(timestamp1, timestamp2); try (Store store = createStore(newFSDirectory(storeDir)); - Engine engine = new InternalEngine( - config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp1, null))) { + Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) { + assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), + new BytesArray("{}".getBytes(Charset.defaultCharset())), null); + engine.index(appendOnlyPrimary(doc, true, timestamp1)); assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } - final long timestamp2 = randomNonNegativeLong(); - final long timestamp3 = randomNonNegativeLong(); - final long maxTimestamp12 = Math.max(timestamp1, timestamp2); - final long maxTimestamp123 = Math.max(maxTimestamp12, timestamp3); try (Store store = createStore(newFSDirectory(storeDir)); - Engine engine = new InternalEngine( - copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, timestamp2, null), - randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) { - assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - if (engine.config().getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) { - // recover from translog and commit maxTimestamp12 - engine.recoverFromTranslog(); - // force flush as the were no ops performed - engine.flush(true, false); - } + Engine engine = new InternalEngine(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null))) { + assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.recoverFromTranslog(); + assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", "test", null, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null); - engine.index(appendOnlyPrimary(doc, true, timestamp3)); - assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.index(appendOnlyPrimary(doc, true, timestamp2)); + assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); + engine.flush(); } try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine( - config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { + copy(config(defaultSettings, store, translogDir, NoMergePolicy.INSTANCE, null), + randomFrom(EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG, EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG)))) { assertEquals(maxTimestamp12, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); - engine.recoverFromTranslog(); - assertEquals(maxTimestamp123, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp()); } } @@ -3385,8 +3369,7 @@ public void testFailEngineOnRandomIO() throws IOException, InterruptedException CyclicBarrier join = new CyclicBarrier(2); CountDownLatch start = new CountDownLatch(1); AtomicInteger controller = new AtomicInteger(0); - EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), - IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, new ReferenceManager.RefreshListener() { + EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(), new ReferenceManager.RefreshListener() { @Override public void beforeRefresh() throws IOException { } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 139c7f500d8d7..349258785f03a 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -29,11 +29,11 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineTests; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; @@ -289,9 +289,9 @@ public long addDocument(Iterable doc) throws IOExcepti return new RecoveryTarget(indexShard, node, recoveryListener, l -> { }) { @Override - public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { preparedForTranslog.set(true); - super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp); + super.prepareForTranslogOperations(totalTranslogOps); } }; }); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 629a8af3e0d3c..eccd958c36e20 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1281,8 +1281,8 @@ public void testShardActiveDuringPeerRecovery() throws IOException { new RecoveryTarget(shard, discoveryNode, recoveryListener, aLong -> { }) { @Override - public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException { - super.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp); + public void prepareForTranslogOperations(int totalTranslogOps) throws IOException { + super.prepareForTranslogOperations(totalTranslogOps); // Shard is still inactive since we haven't started recovering yet assertFalse(replica.isActive()); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 3e5a34c3921fe..aa3b9b1ee85c2 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -29,7 +29,6 @@ import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.Directory; import org.apache.lucene.util.IOUtils; -import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -123,7 +122,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { store, new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, translogHandler, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, - TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null); + TimeValue.timeValueMinutes(5), listeners, null); engine = new InternalEngine(config); listeners.setTranslog(engine.getTranslog()); } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index 40a92b11e7372..e424eb399329e 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -393,7 +393,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView) } @Override - void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException { + void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { prepareTargetForTranslogCalled.set(true); } @@ -483,7 +483,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView) } @Override - void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException { + void prepareTargetForTranslog(final int totalTranslogOps) throws IOException { prepareTargetForTranslogCalled.set(true); } From 064495bf082bc47bd61f68104a3264efe78d700f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 21 Apr 2017 09:16:06 +0200 Subject: [PATCH 3/5] fix assertions --- .../src/main/java/org/elasticsearch/index/shard/IndexShard.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index dd2c99fc12ff6..e0892bd906b38 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1079,7 +1079,7 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole boolean assertionsEnabled = false; assert assertionsEnabled = true; // TODO: add this for shrinked indices. - if (assertionsEnabled) { + if (assertionsEnabled && indexExists) { final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.PEER) { // as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point. From 3c04c94a75fcf2faf86a819aa98c17777b4bb9ce Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 21 Apr 2017 11:38:06 +0200 Subject: [PATCH 4/5] add wire compatibility to RecoveryPrepareForTranslogOperationsRequest for sending to old nodes, even though this is never used. test assertions require it --- .../recovery/RecoveryPrepareForTranslogOperationsRequest.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java index e11233e08710d..155aa53e71a5d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.Version; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -71,5 +72,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(recoveryId); shardId.writeTo(out); out.writeVInt(totalTranslogOps); + if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp + } } } From 3d84faaeca3c70285f9ae53fb24e33b8d1853abe Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 21 Apr 2017 16:35:42 +0200 Subject: [PATCH 5/5] feedback --- .../elasticsearch/index/shard/IndexShard.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index e0892bd906b38..1da5e6763bc66 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1076,23 +1076,7 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG; } - boolean assertionsEnabled = false; - assert assertionsEnabled = true; - // TODO: add this for shrinked indices. - if (assertionsEnabled && indexExists) { - final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); - if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.PEER) { - // as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point. - // This should have baked into the commit by the primary we recover from, regardless of the index age. - assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : - "recovery from remote but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit"; - } else if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE && - indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) { - assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : - "opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID - + " is not found in commit"; - } - } + assert indexExists == false || assertMaxUnsafeAutoIdInCommit(); final EngineConfig config = newEngineConfig(openMode); // we disable deletes since we allow for operations to be executed against the shard while recovering @@ -1108,6 +1092,22 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole } } + private boolean assertMaxUnsafeAutoIdInCommit() throws IOException { + final Map userData = SegmentInfos.readLatestCommit(store.directory()).getUserData(); + if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.PEER) { + // as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point. + // This should have baked into the commit by the primary we recover from, regardless of the index age. + assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : + "recovery from remote but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit"; + } else if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE && + indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) { + assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) : + "opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + + " is not found in commit"; + } + return true; + } + protected void onNewEngine(Engine newEngine) { refreshListeners.setTranslog(newEngine.getTranslog()); }