Skip to content

Commit

Permalink
Recovery should not indefinitely retry on mapping error (elastic#41099)
Browse files Browse the repository at this point in the history
A stuck peer recovery in elastic#40913 reveals that we indefinitely retry on
new cluster states if indexing translog operations hits a mapper
exception. We should not wait and retry if the mapping on the target is
as recent as the mapping that the primary used to index the replaying
operations.

Relates elastic#40913
  • Loading branch information
dnhatn authored and akhil10x5 committed May 2, 2019
1 parent cbf3ad2 commit 64e97c4
Show file tree
Hide file tree
Showing 12 changed files with 149 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3148,7 +3148,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
*
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, ActionListener)
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener)
*/
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
assert seqNo != UNASSIGNED_SEQ_NO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -119,8 +120,8 @@ public PeerRecoveryTargetService(ThreadPool threadPool, TransportService transpo
RecoveryCleanFilesRequest::new, new CleanFilesRequestHandler());
transportService.registerRequestHandler(Actions.PREPARE_TRANSLOG, ThreadPool.Names.GENERIC,
RecoveryPrepareForTranslogOperationsRequest::new, new PrepareForTranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, RecoveryTranslogOperationsRequest::new, ThreadPool.Names.GENERIC,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.TRANSLOG_OPS, ThreadPool.Names.GENERIC, RecoveryTranslogOperationsRequest::new,
new TranslogOperationsRequestHandler());
transportService.registerRequestHandler(Actions.FINALIZE, RecoveryFinalizeRecoveryRequest::new, ThreadPool.Names.GENERIC, new
FinalizeRecoveryRequestHandler());
transportService.registerRequestHandler(
Expand Down Expand Up @@ -501,16 +502,21 @@ public void onTimeout(TimeValue timeout) {
}
});
};
final IndexMetaData indexMetaData = clusterService.state().metaData().index(request.shardId().getIndex());
final long mappingVersionOnTarget = indexMetaData != null ? indexMetaData.getMappingVersion() : 0L;
recoveryTarget.indexTranslogOperations(
request.operations(),
request.totalTranslogOps(),
request.maxSeenAutoIdTimestampOnPrimary(),
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
request.retentionLeases(),
request.mappingVersionOnPrimary(),
ActionListener.wrap(
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
e -> {
if (e instanceof MapperException) {
// do not retry if the mapping on replica is at least as recent as the mapping
// that the primary used to index the operations in the request.
if (mappingVersionOnTarget < request.mappingVersionOnPrimary() && e instanceof MapperException) {
retryOnMappingException.accept(e);
} else {
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
final RetentionLeases retentionLeases = shard.getRetentionLeases();
final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetaData().getMappingVersion();
phase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
retentionLeases, sendSnapshotStep);
retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);
sendSnapshotStep.whenComplete(
r -> IOUtils.close(phase2Snapshot),
e -> {
Expand Down Expand Up @@ -510,6 +511,7 @@ void phase2(
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<SendSnapshotResult> listener) throws IOException {
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
Expand Down Expand Up @@ -571,6 +573,7 @@ void phase2(
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
batchedListener);
}

Expand All @@ -582,6 +585,7 @@ private void sendBatch(
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary,
final ActionListener<Long> listener) throws IOException {
assert ThreadPool.assertCurrentMethodIsNotCalledRecursively();
final List<Translog.Operation> operations = nextBatch.get();
Expand All @@ -594,6 +598,7 @@ private void sendBatch(
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
ActionListener.wrap(
newCheckpoint -> {
sendBatch(
Expand All @@ -604,6 +609,7 @@ private void sendBatch(
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersionOnPrimary,
listener);
},
listener::onFailure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ public void indexTranslogOperations(
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary,
final ActionListener<Long> listener) {
ActionListener.completeWith(listener, () -> {
final RecoveryState.Translog translog = state().getTranslog();
Expand Down Expand Up @@ -351,7 +352,7 @@ public void indexTranslogOperations(
throw new MapperException("mapping updates are not allowed [" + operation + "]");
}
if (result.getFailure() != null) {
if (Assertions.ENABLED) {
if (Assertions.ENABLED && result.getFailure() instanceof MapperException == false) {
throw new AssertionError("unexpected failure while replicating translog entry", result.getFailure());
}
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public interface RecoveryTargetHandler {
* the primary shard when capturing these operations. This value is at least as high as the
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
* @param retentionLeases the retention leases on the primary
* @param mappingVersionOnPrimary the mapping version which is at least as up to date as the mapping version that the
* primary used to index translog {@code operations} in this request.
* If the mapping version on the replica is not older this version, we should not retry on
* {@link org.elasticsearch.index.mapper.MapperException}; otherwise we should wait for a
* new mapping then retry.
* @param listener a listener which will be notified with the local checkpoint on the target
* after these operations are successfully indexed on the target.
*/
Expand All @@ -74,6 +79,7 @@ void indexTranslogOperations(
long maxSeenAutoIdTimestampOnPrimary,
long maxSeqNoOfUpdatesOrDeletesOnPrimary,
RetentionLeases retentionLeases,
long mappingVersionOnPrimary,
ActionListener<Long> listener);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.indices.recovery;

import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.RetentionLeases;
Expand All @@ -31,16 +32,14 @@

public class RecoveryTranslogOperationsRequest extends TransportRequest {

private long recoveryId;
private ShardId shardId;
private List<Translog.Operation> operations;
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
private long maxSeenAutoIdTimestampOnPrimary;
private long maxSeqNoOfUpdatesOrDeletesOnPrimary;
private RetentionLeases retentionLeases;

public RecoveryTranslogOperationsRequest() {
}
private final long recoveryId;
private final ShardId shardId;
private final List<Translog.Operation> operations;
private final int totalTranslogOps;
private final long maxSeenAutoIdTimestampOnPrimary;
private final long maxSeqNoOfUpdatesOrDeletesOnPrimary;
private final RetentionLeases retentionLeases;
private final long mappingVersionOnPrimary;

RecoveryTranslogOperationsRequest(
final long recoveryId,
Expand All @@ -49,14 +48,16 @@ public RecoveryTranslogOperationsRequest() {
final int totalTranslogOps,
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfUpdatesOrDeletesOnPrimary,
final RetentionLeases retentionLeases) {
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.operations = operations;
this.totalTranslogOps = totalTranslogOps;
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary;
this.retentionLeases = retentionLeases;
this.mappingVersionOnPrimary = mappingVersionOnPrimary;
}

public long recoveryId() {
Expand Down Expand Up @@ -87,8 +88,16 @@ public RetentionLeases retentionLeases() {
return retentionLeases;
}

@Override
public void readFrom(StreamInput in) throws IOException {
/**
* Returns the mapping version which is at least as up to date as the mapping version that the primary used to index
* the translog operations in this request. If the mapping version on the replica is not older this version, we should not
* retry on {@link org.elasticsearch.index.mapper.MapperException}; otherwise we should wait for a new mapping then retry.
*/
long mappingVersionOnPrimary() {
return mappingVersionOnPrimary;
}

RecoveryTranslogOperationsRequest(StreamInput in) throws IOException {
super.readFrom(in);
recoveryId = in.readLong();
shardId = ShardId.readShardId(in);
Expand All @@ -97,6 +106,11 @@ public void readFrom(StreamInput in) throws IOException {
maxSeenAutoIdTimestampOnPrimary = in.readZLong();
maxSeqNoOfUpdatesOrDeletesOnPrimary = in.readZLong();
retentionLeases = new RetentionLeases(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
mappingVersionOnPrimary = in.readVLong();
} else {
mappingVersionOnPrimary = Long.MAX_VALUE;
}
}

@Override
Expand All @@ -109,5 +123,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeZLong(maxSeenAutoIdTimestampOnPrimary);
out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary);
retentionLeases.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeVLong(mappingVersionOnPrimary);
}
}

@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void indexTranslogOperations(
final long maxSeenAutoIdTimestampOnPrimary,
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
final RetentionLeases retentionLeases,
final long mappingVersionOnPrimary,
final ActionListener<Long> listener) {
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
recoveryId,
Expand All @@ -120,7 +121,8 @@ public void indexTranslogOperations(
totalTranslogOps,
maxSeenAutoIdTimestampOnPrimary,
maxSeqNoOfDeletesOrUpdatesOnPrimary,
retentionLeases);
retentionLeases,
mappingVersionOnPrimary);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.localCheckpoint),
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ public void indexTranslogOperations(
final long maxAutoIdTimestamp,
final long maxSeqNoOfUpdates,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener) {
// index a doc which is not part of the snapshot, but also does not complete on replica
replicaEngineFactory.latchIndexers(1);
Expand Down Expand Up @@ -597,6 +598,7 @@ public void indexTranslogOperations(
maxAutoIdTimestamp,
maxSeqNoOfUpdates,
retentionLeases,
mappingVersion,
listener);
}
});
Expand Down Expand Up @@ -845,11 +847,13 @@ public void indexTranslogOperations(
final long maxAutoIdTimestamp,
final long maxSeqNoOfUpdates,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener) {
if (hasBlocked() == false) {
blockIfNeeded(RecoveryState.Stage.TRANSLOG);
}
super.indexTranslogOperations(operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, listener);
super.indexTranslogOperations(
operations, totalTranslogOps, maxAutoIdTimestamp, maxSeqNoOfUpdates, retentionLeases, mappingVersion, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2466,13 +2466,15 @@ public void indexTranslogOperations(
final long maxSeenAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener){
super.indexTranslogOperations(
operations,
totalTranslogOps,
maxSeenAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
ActionListener.wrap(
r -> {
assertFalse(replica.isSyncNeeded());
Expand Down Expand Up @@ -2588,13 +2590,15 @@ public void indexTranslogOperations(
final long maxAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener){
super.indexTranslogOperations(
operations,
totalTranslogOps,
maxAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
ActionListener.wrap(
checkpoint -> {
listener.onResponse(checkpoint);
Expand Down Expand Up @@ -2653,13 +2657,15 @@ public void indexTranslogOperations(
final long maxAutoIdTimestamp,
final long maxSeqNoOfUpdatesOrDeletes,
final RetentionLeases retentionLeases,
final long mappingVersion,
final ActionListener<Long> listener) {
super.indexTranslogOperations(
operations,
totalTranslogOps,
maxAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
mappingVersion,
ActionListener.wrap(
r -> {
assertListenerCalled.accept(replica);
Expand Down
Loading

0 comments on commit 64e97c4

Please sign in to comment.