Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peer Recovery: remove maxUnsafeAutoIdTimestamp hand off #24243

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -67,7 +66,6 @@ public final class EngineConfig {
private final Engine.EventListener eventListener;
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
private final long maxUnsafeAutoIdTimestamp;
@Nullable
private final ReferenceManager.RefreshListener refreshListeners;
@Nullable
Expand Down Expand Up @@ -116,7 +114,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners,
long maxUnsafeAutoIdTimestamp, Sort indexSort) {
Sort indexSort) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
}
Expand All @@ -143,9 +141,6 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
this.flushMergesAfter = flushMergesAfter;
this.openMode = openMode;
this.refreshListeners = refreshListeners;
assert maxUnsafeAutoIdTimestamp >= IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP :
"maxUnsafeAutoIdTimestamp must be >= -1 but was " + maxUnsafeAutoIdTimestamp;
this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp;
this.indexSort = indexSort;
}

Expand Down Expand Up @@ -333,11 +328,10 @@ public ReferenceManager.RefreshListener getRefreshListeners() {
}

/**
* Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
* This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
* returns true if the engine is allowed to optimize indexing operations with an auto-generated ID
*/
public long getMaxUnsafeAutoIdTimestamp() {
return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS) ? maxUnsafeAutoIdTimestamp : Long.MAX_VALUE;
public boolean isAutoGeneratedIDsOptimizationEnabled() {
return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,16 @@ public class InternalEngine extends Engine {
private final AtomicInteger throttleRequestCount = new AtomicInteger();
private final EngineConfig.OpenMode openMode;
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
private static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
private final CounterMetric numVersionLookups = new CounterMetric();
private final CounterMetric numIndexVersionsLookups = new CounterMetric();

public InternalEngine(EngineConfig engineConfig) throws EngineException {
super(engineConfig);
openMode = engineConfig.getOpenMode();
if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_5_0_0_beta1)) {
// no optimization for pre 5.0.0.alpha6 since translog might not have all information needed
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
} else {
maxUnsafeAutoIdTimestamp.set(engineConfig.getMaxUnsafeAutoIdTimestamp());
}
this.versionMap = new LiveVersionMap();
store.incRef();
Expand Down Expand Up @@ -1836,7 +1833,7 @@ public void onSettingsChanged() {
mergeScheduler.refreshConfig();
// config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
maybePruneDeletedTombstones();
if (engineConfig.getMaxUnsafeAutoIdTimestamp() == Long.MAX_VALUE) {
if (engineConfig.isAutoGeneratedIDsOptimizationEnabled() == false) {
// this is an anti-viral settings you can only opt out for the entire index
// only if a shard starts up again due to relocation or if the index is closed
// the setting will be re-interpreted if it's set to true
Expand Down
38 changes: 29 additions & 9 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.Query;
Expand All @@ -38,11 +39,11 @@
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.ThreadInterruptedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -79,6 +80,7 @@
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.Segment;
Expand Down Expand Up @@ -1040,11 +1042,11 @@ public void performTranslogRecovery(boolean indexExists) throws IOException {
translogStats.totalOperations(0);
translogStats.totalOperationsOnStart(0);
}
internalPerformTranslogRecovery(false, indexExists, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
internalPerformTranslogRecovery(false, indexExists);
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
}

private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
Expand Down Expand Up @@ -1073,7 +1075,26 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole
} else {
openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
}
final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);

boolean assertionsEnabled = false;
assert assertionsEnabled = true;
// TODO: add this for shrinked indices.
if (assertionsEnabled && indexExists) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move this into a method private boolean assertSomething() and then do assert assertSomething();

final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.PEER) {
// as of 5.5.0, the engine stores the maxUnsafeAutoIdTimestamp in the commit point.
// This should have baked into the commit by the primary we recover from, regardless of the index age.
assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
"recovery from remote but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID + " is not found in commit";
} else if (recoveryState().getRecoverySource().getType() == RecoverySource.Type.EXISTING_STORE &&
indexSettings.getIndexVersionCreated().onOrAfter(Version.V_5_5_0_UNRELEASED)) {
assert userData.containsKey(InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID) :
"opening index which was created post 5.5.0 but " + InternalEngine.MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID
+ " is not found in commit";
}
}

final EngineConfig config = newEngineConfig(openMode);
// we disable deletes since we allow for operations to be executed against the shard while recovering
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
Expand All @@ -1096,9 +1117,9 @@ protected void onNewEngine(Engine newEngine) {
* the replay of the transaction log which is required in cases where we restore a previous index or recover from
* a remote peer.
*/
public void skipTranslogRecovery(long maxUnsafeAutoIdTimestamp) throws IOException {
public void skipTranslogRecovery() throws IOException {
assert getEngineOrNull() == null : "engine was already created";
internalPerformTranslogRecovery(true, true, maxUnsafeAutoIdTimestamp);
internalPerformTranslogRecovery(true, true);
assert recoveryState.getTranslog().recoveredOperations() == 0;
}

Expand Down Expand Up @@ -1795,14 +1816,13 @@ private DocumentMapperForType docMapper(String type) {
return mapperService.documentMapperWithAutoCreate(type);
}

private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, long maxUnsafeAutoIdTimestamp) {
private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
Sort indexSort = indexSortSupplier.get();
return new EngineConfig(openMode, shardId,
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners,
maxUnsafeAutoIdTimestamp, indexSort);
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners, indexSort);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -353,7 +352,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe
recoveryState.getIndex().updateVersion(version);
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
indexShard.skipTranslogRecovery();
} else {
// since we recover from local, just fill the files and size
try {
Expand Down Expand Up @@ -405,7 +404,7 @@ private void restore(final IndexShard indexShard, final Repository repository, f
}
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
indexShard.skipTranslogRecovery();
indexShard.finalizeRecovery();
indexShard.postRecovery("restore done");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ class PrepareForTranslogOperationsRequestHandler implements TransportRequestHand
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(), request.getMaxUnsafeAutoIdTimestamp());
recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps());
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -29,19 +30,17 @@

public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {

private long maxUnsafeAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
private long recoveryId;
private ShardId shardId;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;

public RecoveryPrepareForTranslogOperationsRequest() {
}

RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, long maxUnsafeAutoIdTimestamp) {
RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.totalTranslogOps = totalTranslogOps;
this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp;
}

public long recoveryId() {
Expand All @@ -56,17 +55,15 @@ public int totalTranslogOps() {
return totalTranslogOps;
}

public long getMaxUnsafeAutoIdTimestamp() {
return maxUnsafeAutoIdTimestamp;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
totalTranslogOps = in.readVInt();
maxUnsafeAutoIdTimestamp = in.readLong();
if (in.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
in.readLong(); // maxUnsafeAutoIdTimestamp
}
}

@Override
Expand All @@ -75,6 +72,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeVInt(totalTranslogOps);
out.writeLong(maxUnsafeAutoIdTimestamp);
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeLong(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP); // maxUnsafeAutoIdTimestamp
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public RecoveryResponse recoverToTarget() throws IOException {
}

try {
prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
prepareTargetForTranslog(translogView.totalOperations());
} catch (final Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e);
}
Expand Down Expand Up @@ -389,13 +389,13 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView)
}
}

void prepareTargetForTranslog(final int totalTranslogOps, final long maxUnsafeAutoIdTimestamp) throws IOException {
void prepareTargetForTranslog(final int totalTranslogOps) throws IOException {
StopWatch stopWatch = new StopWatch().start();
logger.trace("recovery [phase1]: prepare remote engine for translog");
final long startEngineStart = stopWatch.totalTime().millis();
// Send a request preparing the new shard's translog to receive operations. This ensures the shard engine is started and disables
// garbage collection (not the JVM's GC!) of tombstone deletes.
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps, maxUnsafeAutoIdTimestamp));
cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
stopWatch.stop();

response.startTime = stopWatch.totalTime().millis() - startEngineStart;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
Expand All @@ -49,7 +48,6 @@
import org.elasticsearch.index.translog.Translog;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand All @@ -58,8 +56,6 @@
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -360,9 +356,9 @@ private void ensureRefCount() {
/*** Implementation of {@link RecoveryTargetHandler } */

@Override
public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
state().getTranslog().totalOperations(totalTranslogOps);
indexShard().skipTranslogRecovery(maxUnsafeAutoIdTimestamp);
indexShard().skipTranslogRecovery();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@ public interface RecoveryTargetHandler {
* Prepares the target to receive translog operations, after all file have been copied
*
* @param totalTranslogOps total translog operations expected to be sent
* @param maxUnsafeAutoIdTimestamp the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
* This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
*/
void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException;
void prepareForTranslogOperations(int totalTranslogOps) throws IOException;

/**
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ public RemoteRecoveryTargetHandler(long recoveryId, ShardId shardId, String targ
}

@Override
public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, maxUnsafeAutoIdTimestamp),
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
Expand Down
Loading