Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure changes requests return the latest mapping version #37633

Merged
merged 9 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final List<IndexEventListener> buildInIndexListener;
private final PrimaryReplicaSyncer primaryReplicaSyncer;
private final Consumer<ShardId> globalCheckpointSyncer;
private volatile ClusterState clusterState; // the latest applied cluster state

@Inject
public IndicesClusterStateService(
Expand Down Expand Up @@ -229,6 +230,7 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {
return;
}

this.clusterState = state;
updateFailedShardsCache(state);

deleteIndices(event); // also deletes shards of deleted indices
Expand All @@ -246,6 +248,13 @@ public synchronized void applyClusterState(final ClusterChangedEvent event) {
createOrUpdateShards(state);
}

/**
* Returns the latest applied cluster state
*/
public ClusterState getClusterState() {
return clusterState;
}

/**
* Removes shard entries from the failed shards cache that are no longer allocated to this node by the master.
* Sends shard failures for shards that are marked as actively allocated to this node but don't actually exist on the node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
Expand All @@ -18,6 +19,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand All @@ -34,6 +36,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -212,6 +215,12 @@ public long getSettingsVersion() {
return settingsVersion;
}

private long metadataVersion;

public long getMetadataVersion() {
return metadataVersion;
}

private long globalCheckpoint;

public long getGlobalCheckpoint() {
Expand Down Expand Up @@ -248,6 +257,7 @@ public long getTookInMillis() {
Response(
final long mappingVersion,
final long settingsVersion,
final long metadataVersion,
final long globalCheckpoint,
final long maxSeqNo,
final long maxSeqNoOfUpdatesOrDeletes,
Expand All @@ -256,6 +266,7 @@ public long getTookInMillis() {

this.mappingVersion = mappingVersion;
this.settingsVersion = settingsVersion;
this.metadataVersion = metadataVersion;
this.globalCheckpoint = globalCheckpoint;
this.maxSeqNo = maxSeqNo;
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
Expand All @@ -268,6 +279,11 @@ public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
mappingVersion = in.readVLong();
settingsVersion = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
metadataVersion = in.readVLong();
} else {
metadataVersion = 0L;
}
globalCheckpoint = in.readZLong();
maxSeqNo = in.readZLong();
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
Expand All @@ -280,6 +296,9 @@ public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(mappingVersion);
out.writeVLong(settingsVersion);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeVLong(metadataVersion);
}
out.writeZLong(globalCheckpoint);
out.writeZLong(maxSeqNo);
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
Expand All @@ -294,6 +313,7 @@ public boolean equals(final Object o) {
final Response that = (Response) o;
return mappingVersion == that.mappingVersion &&
settingsVersion == that.settingsVersion &&
metadataVersion == that.metadataVersion &&
globalCheckpoint == that.globalCheckpoint &&
maxSeqNo == that.maxSeqNo &&
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
Expand All @@ -306,6 +326,7 @@ public int hashCode() {
return Objects.hash(
mappingVersion,
settingsVersion,
metadataVersion,
globalCheckpoint,
maxSeqNo,
maxSeqNoOfUpdatesOrDeletes,
Expand All @@ -317,40 +338,47 @@ public int hashCode() {
public static class TransportAction extends TransportSingleShardAction<Request, Response> {

private final IndicesService indicesService;
private final IndicesClusterStateService indicesClusterStateService;

@Inject
public TransportAction(ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
IndicesService indicesService) {
IndicesService indicesService,
IndicesClusterStateService indicesClusterStateService) {
super(NAME, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, ThreadPool.Names.SEARCH);
this.indicesService = indicesService;
this.indicesClusterStateService = indicesClusterStateService;
}

@Override
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();

final Translog.Operation[] operations = getOperations(
indexShard,
seqNoStats.getGlobalCheckpoint(),
request.getFromSeqNo(),
request.getMaxOperationCount(),
request.getExpectedHistoryUUID(),
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
// must capture after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
// must capture IndexMetaData after snapshotting operations to ensure the returned mapping version is at least as up-to-date
// as the mapping version that these operations used. Here we must not use IndexMetaData from ClusterService for we expose
// a new cluster state to ClusterApplier(s) before exposing it in the ClusterService.
final MetaData metaData = indicesClusterStateService.getClusterState().metaData();
final IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
return getResponse(
mappingVersion,
settingsVersion,
metaData.version(),
seqNoStats,
maxSeqNoOfUpdatesOrDeletes,
operations,
Expand Down Expand Up @@ -430,7 +458,8 @@ private void globalCheckpointAdvancementFailure(
e);
if (e instanceof TimeoutException) {
try {
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final MetaData metaData = indicesClusterStateService.getClusterState().metaData();
final IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
Expand All @@ -439,6 +468,7 @@ private void globalCheckpointAdvancementFailure(
getResponse(
mappingVersion,
settingsVersion,
metaData.version(),
latestSeqNoStats,
maxSeqNoOfUpdatesOrDeletes,
EMPTY_OPERATIONS_ARRAY,
Expand Down Expand Up @@ -530,6 +560,7 @@ static Translog.Operation[] getOperations(
static Response getResponse(
final long mappingVersion,
final long settingsVersion,
final long metadataVersion,
final SeqNoStats seqNoStats,
final long maxSeqNoOfUpdates,
final Translog.Operation[] operations,
Expand All @@ -539,6 +570,7 @@ static Response getResponse(
return new Response(
mappingVersion,
settingsVersion,
metadataVersion,
seqNoStats.getGlobalCheckpoint(),
seqNoStats.getMaxSeqNo(),
maxSeqNoOfUpdates,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void start(
}

// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
updateMapping(followerMappingVersion -> {
updateMapping(0L, 0L, followerMappingVersion -> {
synchronized (ShardFollowNodeTask.this) {
currentMappingVersion = followerMappingVersion;
}
Expand Down Expand Up @@ -285,7 +285,8 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res
// 3) handle read response:
Runnable handleResponseTask = () -> innerHandleReadResponse(from, maxRequiredSeqNo, response);
// 2) update follow index mapping:
Runnable updateMappingsTask = () -> maybeUpdateMapping(response.getMappingVersion(), handleResponseTask);
Runnable updateMappingsTask = () -> maybeUpdateMapping(
response.getMappingVersion(), response.getMetadataVersion(), handleResponseTask);
// 1) update follow index settings:
maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
}
Expand Down Expand Up @@ -370,15 +371,15 @@ private synchronized void handleWriteResponse(final BulkShardOperationsResponse
coordinateReads();
}

private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) {
private synchronized void maybeUpdateMapping(long minimumRequiredMappingVersion, long minRequiredMetadataVersion, Runnable task) {
if (currentMappingVersion >= minimumRequiredMappingVersion) {
LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
task.run();
} else {
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
updateMapping(mappingVersion -> {
updateMapping(minimumRequiredMappingVersion, minRequiredMetadataVersion, mappingVersion -> {
currentMappingVersion = mappingVersion;
task.run();
});
Expand All @@ -400,12 +401,15 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings
}
}

private void updateMapping(LongConsumer handler) {
updateMapping(handler, new AtomicInteger(0));
private void updateMapping(long minRequiredMappingVersion, long minRequiredMetadataVersion, LongConsumer handler) {
updateMapping(minRequiredMappingVersion, minRequiredMetadataVersion, handler, new AtomicInteger(0));
}

private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter)));
private void updateMapping(long minRequiredMappingVersion, long minRequiredMetadataVersion,
LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(minRequiredMappingVersion, minRequiredMetadataVersion, handler,
e -> handleFailure(
e, retryCounter, () -> updateMapping(minRequiredMappingVersion, minRequiredMetadataVersion, handler, retryCounter)));
}

private void updateSettings(final LongConsumer handler) {
Expand Down Expand Up @@ -471,7 +475,8 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
}

// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
protected abstract void innerUpdateMapping(long minRequiredMappingVersion, long minRequiredMetadataVersion,
LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,24 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
scheduler, System::nanoTime) {

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
protected void innerUpdateMapping(long minRequiredMappingVersion, long minRequiredMetadataVersion,
LongConsumer handler, Consumer<Exception> errorHandler) {
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
if (minRequiredMetadataVersion > 0) {
clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion)
.waitForTimeout(TimeValue.timeValueNanos(Long.MAX_VALUE));
}
CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
// this may return an outdated version if we did not have wait_for_metadata_version.
if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we had the metadata version (which is also updated whenever index metadata / mapping changes), we could just do a waitForMetaDataVersion? This would avoid a possible busyloop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove this retry in 7.0 after backporting to 6.x

assert minRequiredMetadataVersion == 0 : minRequiredMappingVersion;
innerUpdateMapping(minRequiredMappingVersion, minRequiredMetadataVersion, handler, errorHandler);
return;
}
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
handler.accept(indexMetaData.getMappingVersion());
Expand Down
Loading