Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCR: Replicate existing ops with old term on follower #34412

Merged
merged 15 commits into from
Oct 19, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -2410,9 +2410,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
// TODO: Should we defer the refresh until we really need it?
ensureOpen();
if (lastRefreshedCheckpoint() < toSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
Expand Down Expand Up @@ -2522,6 +2520,15 @@ final long lastRefreshedCheckpoint() {
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
}

/**
* Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint.
*/
protected final void refreshIfNeeded(String source, long requestingSeqNo) {
if (lastRefreshedCheckpoint() < requestingSeqNo) {
refresh(source, SearcherScope.INTERNAL);
}
}

private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
final AtomicLong refreshedCheckpoint;
private long pendingCheckpoint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,36 @@
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MapperTestUtils;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.SourceFieldMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -307,6 +314,27 @@ protected static ParsedDocument testParsedDocument(
mappingUpdate);
}

public static CheckedFunction<String, ParsedDocument, IOException> nestedParsedDocFactory() throws Exception {
final MapperService mapperService = createMapperService("type");
final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type")
.startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject()
.endObject().endObject());
final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping));
return docId -> {
final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value");
final int nestedValues = between(0, 3);
if (nestedValues > 0) {
XContentBuilder nestedField = source.startObject("nested_field");
for (int i = 0; i < nestedValues; i++) {
nestedField.field("field-" + i, "value-" + i);
}
source.endObject();
}
source.endObject();
return nestedMapper.parse(SourceToParse.source("test", "type", docId, BytesReference.bytes(source), XContentType.JSON));
};
}

/**
* Creates a tombstone document that only includes uid, seq#, term and version fields.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class TransportBulkShardOperationsAction
extends TransportWriteAction<BulkShardOperationsRequest, BulkShardOperationsRequest, BulkShardOperationsResponse> {
Expand Down Expand Up @@ -68,6 +67,41 @@ protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResp
request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
}

static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) {
final Translog.Operation operationWithPrimaryTerm;
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
operationWithPrimaryTerm = new Translog.Index(
index.type(),
index.id(),
index.seqNo(),
primaryTerm,
index.version(),
BytesReference.toBytes(index.source()),
index.routing(),
index.getAutoGeneratedIdTimestamp());
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
operationWithPrimaryTerm = new Translog.Delete(
delete.type(),
delete.id(),
delete.uid(),
delete.seqNo(),
primaryTerm,
delete.version());
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason());
break;
default:
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");
}
return operationWithPrimaryTerm;
}

// public for testing purposes only
public static CcrWritePrimaryResult shardOperationOnPrimary(
final ShardId shardId,
Expand All @@ -81,73 +115,42 @@ public static CcrWritePrimaryResult shardOperationOnPrimary(
"], actual [" + primary.getHistoryUUID() + "], shard is likely restored from snapshot or force allocated");
}

final Function<Translog.Operation, Translog.Operation> rewriteWithTerm = operation -> {
final Translog.Operation operationWithPrimaryTerm;
switch (operation.opType()) {
case INDEX:
final Translog.Index index = (Translog.Index) operation;
operationWithPrimaryTerm = new Translog.Index(
index.type(),
index.id(),
index.seqNo(),
primary.getOperationPrimaryTerm(),
index.version(),
BytesReference.toBytes(index.source()),
index.routing(),
index.getAutoGeneratedIdTimestamp());
break;
case DELETE:
final Translog.Delete delete = (Translog.Delete) operation;
operationWithPrimaryTerm = new Translog.Delete(
delete.type(),
delete.id(),
delete.uid(),
delete.seqNo(),
primary.getOperationPrimaryTerm(),
delete.version());
break;
case NO_OP:
final Translog.NoOp noOp = (Translog.NoOp) operation;
operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getOperationPrimaryTerm(), noOp.reason());
break;
default:
throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]");
}
return operationWithPrimaryTerm;
};

assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]";
primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);

final List<Translog.Operation> appliedOperations = new ArrayList<>(sourceOperations.size());
Translog.Location location = null;
long waitingForGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
for (Translog.Operation sourceOp : sourceOperations) {
final Translog.Operation targetOp = rewriteWithTerm.apply(sourceOp);
final Translog.Operation targetOp = rewriteOperationWithPrimaryTerm(sourceOp, primary.getOperationPrimaryTerm());
final Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY);
if (result.getResultType() == Engine.Result.Type.SUCCESS) {
assert result.getSeqNo() == targetOp.seqNo();
appliedOperations.add(targetOp);
location = locationToSync(location, result.getTranslogLocation());
} else {
if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) {
// Skipped operations will be delivered to replicas via primary-replica resync or peer-recovery.
// The primary must not acknowledge this request until the global checkpoint is at least the highest
// seqno of all skipped operations (i.e., all skipped operations have been processed on every replica).
waitingForGlobalCheckpoint = SequenceNumbers.max(waitingForGlobalCheckpoint, targetOp.seqNo());
// The existing operations below the global checkpoint won't be replicated as they were processed
// in every replicas already. However, the existing operations above the global checkpoint will be
// replicated to replicas but with the existing primary term (not the current primary term) in order
// to guarantee the consistency between the primary and replicas, and between translog and Lucene index.
final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure();
assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo();
if (failure.getExistingPrimaryTerm().isPresent()) {
appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong()));
} else if (targetOp.seqNo() > primary.getGlobalCheckpoint()) {
assert false : "can't find primary_term for existing op=" + targetOp + " gcp=" + primary.getGlobalCheckpoint();
throw new IllegalStateException("can't find primary_term for existing op=" + targetOp +
" global_checkpoint=" + primary.getGlobalCheckpoint(), failure);
}
} else {
assert false : "Only already-processed error should happen; op=[" + targetOp + "] error=[" + result.getFailure() + "]";
throw ExceptionsHelper.convertToElastic(result.getFailure());
}
}
}
assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO :
"waiting global checkpoint is not assigned; waiting_gcp=" + waitingForGlobalCheckpoint +
" source_ops=" + sourceOperations.size() + " applied_ops=" + sourceOperations.size();
assert appliedOperations.size() == 0 || location != null;
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes);
return new CcrWritePrimaryResult(replicaRequest, location, primary, waitingForGlobalCheckpoint, logger);
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
}

@Override
Expand Down Expand Up @@ -184,12 +187,8 @@ protected BulkShardOperationsResponse newResponseInstance() {
* Custom write result to include global checkpoint after ops have been replicated.
*/
static final class CcrWritePrimaryResult extends WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> {
final long waitingForGlobalCheckpoint;

CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary,
long waitingForGlobalCheckpoint, Logger logger) {
CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) {
super(request, new BulkShardOperationsResponse(), location, null, primary, logger);
this.waitingForGlobalCheckpoint = waitingForGlobalCheckpoint;
}

@Override
Expand All @@ -201,19 +200,7 @@ public synchronized void respond(ActionListener<BulkShardOperationsResponse> lis
response.setMaxSeqNo(seqNoStats.getMaxSeqNo());
listener.onResponse(response);
}, listener::onFailure);

if (waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO) {
primary.addGlobalCheckpointListener(waitingForGlobalCheckpoint, (gcp, e) -> {
if (e != null) {
listener.onFailure(e);
} else {
assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp;
super.respond(wrappedListener);
}
}, null);
} else {
super.respond(wrappedListener);
}
super.respond(wrappedListener);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,27 @@
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.ShardId;

import java.util.OptionalLong;

/**
* An exception represents that an operation was processed before on the {@link FollowingEngine} of the primary of a follower.
* The field {@code existingPrimaryTerm} is empty only if the operation is below the global checkpoint; otherwise it should be non-empty.
*/
public final class AlreadyProcessedFollowingEngineException extends VersionConflictEngineException {
AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo) {
super(shardId, "operation [{}] was processed before", null, seqNo);
private final long seqNo;
private final OptionalLong existingPrimaryTerm;

AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo, OptionalLong existingPrimaryTerm) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
super(shardId, "operation [{}] was processed before with term [{}]", null, seqNo, existingPrimaryTerm);
this.seqNo = seqNo;
this.existingPrimaryTerm = existingPrimaryTerm;
}

public long getSeqNo() {
return seqNo;
}

public OptionalLong getExistingPrimaryTerm() {
return existingPrimaryTerm;
}
}
Loading