Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure changes requests return the latest mapping version #37633

Merged
merged 9 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
ThreadPool threadPool,
Client client,
SettingsModule settingsModule) {
IndexScopedSettings indexScopedSettings = settingsModule.getIndexScopedSettings();
return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, indexScopedSettings));
return Collections.singletonList(new ShardFollowTasksExecutor(client, threadPool, clusterService, settingsModule));
}

public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ public final class CcrSettings {
Setting.boolSetting("index.xpack.ccr.following_index", false, Property.IndexScope, Property.InternalIndex);

/**
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using.
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator and shard follow task should be using.
*/
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);
public static final Setting<TimeValue> CCR_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.wait_for_metadata_timeout", TimeValue.timeValueSeconds(60), Property.NodeScope, Property.Dynamic);

/**
* Dynamic node setting for specifying the wait_for_timeout that the auto follow coordinator should be using.
* TODO: Deprecate and remove this setting
*/
private static final Setting<TimeValue> CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT = Setting.timeSetting(
"ccr.auto_follow.wait_for_metadata_timeout", CCR_WAIT_FOR_METADATA_TIMEOUT, Property.NodeScope, Property.Dynamic);

/**
* Max bytes a node can recover per second.
Expand Down Expand Up @@ -62,7 +68,8 @@ static List<Setting<?>> getSettings() {
CCR_FOLLOWING_INDEX_SETTING,
RECOVERY_MAX_BYTES_PER_SECOND,
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT);
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
CCR_WAIT_FOR_METADATA_TIMEOUT);
}

private final CombinedRateLimiter ccrRateLimiter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ protected boolean removeEldestEntry(final Map.Entry<String, Tuple<Long, Elastics
waitForMetadataTimeOut = newWaitForTimeOut;
}
};
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, updater);
waitForMetadataTimeOut = CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT, updater);
waitForMetadataTimeOut = CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.get(settings);
}

public synchronized AutoFollowStats getStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,21 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
final IndexShard indexShard = indexService.getShard(request.getShard().id());
final SeqNoStats seqNoStats = indexShard.seqNoStats();
final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();

final Translog.Operation[] operations = getOperations(
indexShard,
seqNoStats.getGlobalCheckpoint(),
request.getFromSeqNo(),
request.getMaxOperationCount(),
request.getExpectedHistoryUUID(),
request.getMaxBatchSize());
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
// must capture after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
// must capture IndexMetaData after snapshotting operations to ensure the returned mapping version is at least as up-to-date
// as the mapping version that these operations used. Here we must not use IndexMetaData from ClusterService for we expose
// a new cluster state to ClusterApplier(s) before exposing it in the ClusterService.
final IndexMetaData indexMetaData = indexService.getMetaData();
final long mappingVersion = indexMetaData.getMappingVersion();
final long settingsVersion = indexMetaData.getSettingsVersion();
return getResponse(
mappingVersion,
settingsVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void start(
}

// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
updateMapping(followerMappingVersion -> {
updateMapping(0L, followerMappingVersion -> {
synchronized (ShardFollowNodeTask.this) {
currentMappingVersion = followerMappingVersion;
}
Expand Down Expand Up @@ -370,15 +370,15 @@ private synchronized void handleWriteResponse(final BulkShardOperationsResponse
coordinateReads();
}

private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) {
private synchronized void maybeUpdateMapping(long minimumRequiredMappingVersion, Runnable task) {
if (currentMappingVersion >= minimumRequiredMappingVersion) {
LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
task.run();
} else {
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
updateMapping(mappingVersion -> {
updateMapping(minimumRequiredMappingVersion, mappingVersion -> {
currentMappingVersion = mappingVersion;
task.run();
});
Expand All @@ -400,12 +400,13 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings
}
}

private void updateMapping(LongConsumer handler) {
updateMapping(handler, new AtomicInteger(0));
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler) {
updateMapping(minRequiredMappingVersion, handler, new AtomicInteger(0));
}

private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(handler, e -> handleFailure(e, retryCounter, () -> updateMapping(handler, retryCounter)));
private void updateMapping(long minRequiredMappingVersion, LongConsumer handler, AtomicInteger retryCounter) {
innerUpdateMapping(minRequiredMappingVersion, handler,
e -> handleFailure(e, retryCounter, () -> updateMapping(minRequiredMappingVersion, handler, retryCounter)));
}

private void updateSettings(final LongConsumer handler) {
Expand Down Expand Up @@ -471,7 +472,7 @@ static boolean shouldRetry(String remoteCluster, Exception e) {
}

// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
protected abstract void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.Index;
Expand All @@ -46,6 +48,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
Expand All @@ -69,16 +72,20 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final IndexScopedSettings indexScopedSettings;
private volatile TimeValue waitForMetadataTimeOut;

public ShardFollowTasksExecutor(Client client,
ThreadPool threadPool,
ClusterService clusterService,
IndexScopedSettings indexScopedSettings) {
SettingsModule settingsModule) {
super(ShardFollowTask.NAME, Ccr.CCR_THREAD_POOL_NAME);
this.client = client;
this.threadPool = threadPool;
this.clusterService = clusterService;
this.indexScopedSettings = indexScopedSettings;
this.indexScopedSettings = settingsModule.getIndexScopedSettings();
this.waitForMetadataTimeOut = CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.get(settingsModule.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT,
newVal -> this.waitForMetadataTimeOut = newVal);
}

@Override
Expand Down Expand Up @@ -112,33 +119,25 @@ protected AllocatedPersistentTask createTask(long id, String type, String action
scheduler, System::nanoTime) {

@Override
protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler) {
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
handler.accept(indexMetaData.getMappingVersion());
return;
}

assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" +
indexMetaData.getMappings().size() + "]";
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;

PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData);
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
};
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
errorHandler.accept(e);
}
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
final Index followerIndex = params.getFollowShardId().getIndex();
getIndexMetadata(minRequiredMappingVersion, 0L, params, ActionListener.wrap(
indexMetaData -> {
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
handler.accept(indexMetaData.getMappingVersion());
return;
}
assert indexMetaData.getMappings().size() == 1 : "expected exactly one mapping, but got [" +
indexMetaData.getMappings().size() + "]";
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
},
errorHandler
));
}

@Override
Expand Down Expand Up @@ -257,6 +256,39 @@ private Client remoteClient(ShardFollowTask params) {
return wrapClient(client.getRemoteClusterClient(params.getRemoteCluster()), params.getHeaders());
}

private void getIndexMetadata(long minRequiredMappingVersion, long minRequiredMetadataVersion,
ShardFollowTask params, ActionListener<IndexMetaData> listener) {
final Index leaderIndex = params.getLeaderShardId().getIndex();
final ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
if (minRequiredMetadataVersion > 0) {
clusterStateRequest.waitForMetaDataVersion(minRequiredMetadataVersion).waitForTimeout(waitForMetadataTimeOut);
}
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(
r -> {
// if wait_for_metadata_version timeout, the response is empty
if (r.getState() == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what odd behavior

assert minRequiredMetadataVersion > 0;
getIndexMetadata(minRequiredMappingVersion, minRequiredMetadataVersion, params, listener);
return;
}
final MetaData metaData = r.getState().metaData();
final IndexMetaData indexMetaData = metaData.getIndexSafe(leaderIndex);
if (indexMetaData.getMappingVersion() < minRequiredMappingVersion) {
// ask for the next version.
getIndexMetadata(minRequiredMappingVersion, metaData.version() + 1, params, listener);
} else {
assert metaData.version() >= minRequiredMetadataVersion : metaData.version() + " < " + minRequiredMetadataVersion;
listener.onResponse(indexMetaData);
}
},
listener::onFailure
));
} catch (Exception e) {
listener.onFailure(e);
}
}

interface FollowerStatsInfoHandler {
void accept(String followerHistoryUUID, long globalCheckpoint, long maxSeqNo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedA
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
if (configureRemoteClusterViaNodeSettings() && leaderSeedAddress != null) {
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ protected Settings nodeSettings() {
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
// Let cluster state api return quickly in order to speed up auto follow tests:
builder.put(CcrSettings.CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
builder.put(CcrSettings.CCR_WAIT_FOR_METADATA_TIMEOUT.getKey(), TimeValue.timeValueMillis(100));
return builder.build();
}

Expand Down
Loading