Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CcrRepository to init follower index (#35719) #37988

Merged
merged 6 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
final String clusterAlias,
final String leaderIndex,
final Consumer<Exception> onFailure,
final BiConsumer<String[], IndexMetaData> consumer) {
final BiConsumer<String[], Tuple<ClusterState, IndexMetaData>> consumer) {

final ClusterStateRequest request = new ClusterStateRequest();
request.clear();
Expand All @@ -134,7 +135,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
hasPrivilegesToFollowIndices(remoteClient, new String[] {leaderIndex}, e -> {
if (e == null) {
fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, onFailure, historyUUIDs ->
consumer.accept(historyUUIDs, leaderIndexMetaData));
consumer.accept(historyUUIDs, new Tuple<>(remoteClusterState, leaderIndexMetaData)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to set .nodes(true) on the cluster state request in order to get the minNodeVersion? Why is this not failing any tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nodes defaults to true. But I will set it explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh well I see that we set clear() which should reset it. I'll investigate why the test did not fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason that the test was passing (I believe based on reproductions locally) is that the rolling upgrade uses the local cluster as the remote cluster. So the check that your local cluster was mixed with pre-6.7 was pushing us to use compatibility mode. I changed the remote cluster state request to request nodes(true).

This does raise questions about testing, do we need to invest in rolling upgrade tests that use two different cluster and have a test where your local cluster is all 6.7 and the remote cluster is mixed?

} else {
onFailure.accept(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;

import java.util.HashMap;
import java.util.Map;

final class Pre67PutFollow {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 too easy to forget about bwc.
Did you notice the ccr tests in rolling upgrade tests fail when this was missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Not always but sometimes.


private static final Logger logger = LogManager.getLogger(Pre67PutFollow.class);

private final Client client;
private final ClusterService clusterService;
private final AllocationService allocationService;
private final ActiveShardsObserver activeShardsObserver;

Pre67PutFollow(final Client client, final ClusterService clusterService, final AllocationService allocationService,
final ActiveShardsObserver activeShardsObserver) {
this.client = client;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.activeShardsObserver = activeShardsObserver;
}

void doPre67PutFollow(final PutFollowAction.Request request, final IndexMetaData leaderIndexMetaData, String[] historyUUIDs,
final ActionListener<PutFollowAction.Response> listener) {
ActionListener<Boolean> handler = ActionListener.wrap(
result -> {
if (result) {
initiateFollowing(request, listener);
} else {
listener.onResponse(new PutFollowAction.Response(true, false, false));
}
},
listener::onFailure);
// Can't use create index api here, because then index templates can alter the mappings / settings.
// And index templates could introduce settings / mappings that are incompatible with the leader index.
clusterService.submitStateUpdateTask("create_following_index", new AckedClusterStateUpdateTask<Boolean>(request, handler) {

@Override
protected Boolean newResponse(final boolean acknowledged) {
return acknowledged;
}

@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
String followIndex = request.getFollowRequest().getFollowerIndex();
IndexMetaData currentIndex = currentState.metaData().index(followIndex);
if (currentIndex != null) {
throw new ResourceAlreadyExistsException(currentIndex.getIndex());
}

MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(followIndex);

// Adding the leader index uuid for each shard as custom metadata:
Map<String, String> metadata = new HashMap<>();
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName());
metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, request.getRemoteCluster());
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);

// Copy all settings, but overwrite a few settings.
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(leaderIndexMetaData.getSettings());
// Overwriting UUID here, because otherwise we can't follow indices in the same cluster
settingsBuilder.put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
settingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followIndex);
settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
settingsBuilder.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
imdBuilder.settings(settingsBuilder);

// Copy mappings from leader IMD to follow IMD
for (ObjectObjectCursor<String, MappingMetaData> cursor : leaderIndexMetaData.getMappings()) {
imdBuilder.putMapping(cursor.value);
}
imdBuilder.setRoutingNumShards(leaderIndexMetaData.getRoutingNumShards());
IndexMetaData followIMD = imdBuilder.build();
mdBuilder.put(followIMD, false);

ClusterState.Builder builder = ClusterState.builder(currentState);
builder.metaData(mdBuilder.build());
ClusterState updatedState = builder.build();

RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())
.addAsNew(updatedState.metaData().index(request.getFollowRequest().getFollowerIndex()));
updatedState = allocationService.reroute(
ClusterState.builder(updatedState).routingTable(routingTableBuilder.build()).build(),
"follow index [" + request.getFollowRequest().getFollowerIndex() + "] created");

logger.info("[{}] creating index, cause [ccr_create_and_follow], shards [{}]/[{}]",
followIndex, followIMD.getNumberOfShards(), followIMD.getNumberOfReplicas());

return updatedState;
}
});
}

private void initiateFollowing(final PutFollowAction.Request request, final ActionListener<PutFollowAction.Response> listener) {
activeShardsObserver.waitForActiveShards(new String[]{request.getFollowRequest().getFollowerIndex()},
ActiveShardCount.DEFAULT, request.timeout(), result -> {
if (result) {
client.execute(ResumeFollowAction.INSTANCE, request.getFollowRequest(), ActionListener.wrap(
r -> listener.onResponse(new PutFollowAction.Response(true, true, r.isAcknowledged())),
listener::onFailure
));
} else {
listener.onResponse(new PutFollowAction.Response(true, false, false));
}
}, listener::onFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
Expand All @@ -23,7 +24,9 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -52,6 +55,7 @@ public final class TransportPutFollowAction
private final RestoreService restoreService;
private final CcrLicenseChecker ccrLicenseChecker;
private final ActiveShardsObserver activeShardsObserver;
private final Pre67PutFollow pre67PutFollow;

@Inject
public TransportPutFollowAction(
Expand All @@ -63,6 +67,7 @@ public TransportPutFollowAction(
final IndexNameExpressionResolver indexNameExpressionResolver,
final Client client,
final RestoreService restoreService,
final AllocationService allocationService,
final CcrLicenseChecker ccrLicenseChecker) {
super(
settings,
Expand All @@ -77,6 +82,7 @@ public TransportPutFollowAction(
this.restoreService = restoreService;
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.pre67PutFollow = new Pre67PutFollow(client, clusterService, allocationService, activeShardsObserver);
}

@Override
Expand Down Expand Up @@ -108,18 +114,22 @@ protected void masterOperation(
client.getRemoteClusterClient(remoteCluster);

String leaderIndex = request.getLeaderIndex();
Version minNodeVersion = state.getNodes().getMinNodeVersion();
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client,
remoteCluster,
leaderIndex,
listener::onFailure,
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, request, listener));
(historyUUID, metaDataTuple) -> createFollowerIndex(metaDataTuple, historyUUID, request, listener, minNodeVersion));
}

private void createFollowerIndex(
final IndexMetaData leaderIndexMetaData,
final Tuple<ClusterState, IndexMetaData> metaDataTuple,
final String [] historyUUID,
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
final ActionListener<PutFollowAction.Response> listener,
final Version localClusterMinNodeVersion) {
IndexMetaData leaderIndexMetaData = metaDataTuple.v2();
if (leaderIndexMetaData == null) {
listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not exist"));
return;
Expand All @@ -132,39 +142,49 @@ private void createFollowerIndex(
return;
}

final Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex())
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$")
.renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.indexSettings(settingsBuilder);
boolean pre67CompatibilityMode = localClusterMinNodeVersion.before(Version.V_6_7_0)
|| metaDataTuple.v1().getNodes().getMinNodeVersion().before(Version.V_6_7_0);

final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
if (pre67CompatibilityMode) {
logger.warn("Pre-6.7 nodes present in local/remote cluster. Cannot bootstrap from remote. Creating empty follower index [{}] " +
Tim-Brooks marked this conversation as resolved.
Show resolved Hide resolved
"and initiating following [{}, {}].", request.getFollowRequest().getFollowerIndex(), request.getRemoteCluster(),
request.getLeaderIndex());
pre67PutFollow.doPre67PutFollow(request, leaderIndexMetaData, historyUUID, listener);
} else {
final Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getFollowRequest().getFollowerIndex())
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
final String leaderClusterRepoName = CcrRepository.NAME_PREFIX + request.getRemoteCluster();
final RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(leaderClusterRepoName, CcrRepository.LATEST)
.indices(request.getLeaderIndex()).indicesOptions(request.indicesOptions()).renamePattern("^(.*)$")
.renameReplacement(request.getFollowRequest().getFollowerIndex()).masterNodeTimeout(request.masterNodeTimeout())
.indexSettings(settingsBuilder);

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
final Client clientWithHeaders = CcrLicenseChecker.wrapClient(this.client, threadPool.getThreadContext().getHeaders());
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {

@Override
protected void doRun() throws Exception {
restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreService.RestoreCompletionResponse>() {

@Override
public void onResponse(RestoreService.RestoreCompletionResponse response) {
afterRestoreStarted(clientWithHeaders, request, listener, response);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
});
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}

@Override
protected void doRun() throws Exception {
restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreService.RestoreCompletionResponse>() {

@Override
public void onResponse(RestoreService.RestoreCompletionResponse response) {
afterRestoreStarted(clientWithHeaders, request, listener, response);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
});
}
}

private void afterRestoreStarted(Client clientWithHeaders, PutFollowAction.Request request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ protected void masterOperation(final ResumeFollowAction.Request request,
leaderCluster,
leaderIndex,
listener::onFailure,
(leaderHistoryUUID, leaderIndexMetadata) -> {
(leaderHistoryUUID, metaDataTuple) -> {
try {
start(request, leaderCluster, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener);
start(request, leaderCluster, metaDataTuple.v2(), followerIndexMetadata, leaderHistoryUUID, listener);
} catch (final IOException e) {
listener.onFailure(e);
}
Expand Down