Skip to content

Commit

Permalink
Add deprecation warning when a system index is followed
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez committed May 25, 2021
1 parent 9f01163 commit 9b8adc0
Show file tree
Hide file tree
Showing 21 changed files with 180 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,28 @@

import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.WarningFailureException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskResultsService;

import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -235,6 +242,36 @@ public void testFollowSearchableSnapshotsFails() throws Exception {
}
}

public void testFollowSystemIndexTriggersDeprecationWarnings() throws Exception {
if ("leader".equals(targetCluster)) {
final Request request = new Request("POST", "/" + TaskResultsService.TASK_INDEX + "/_doc/123");
XContentBuilder document = jsonBuilder();
document.startObject();
document.field("completed", true);
document.endObject();

// Avoid throwing a warning exception since we're writing into a system index
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warnings -> false));
request.setJsonEntity(Strings.toString(document));
assertOK(adminClient().performRequest(request));
ensureGreen(TaskResultsService.TASK_INDEX);
} else {
final WarningFailureException exception = expectThrows(WarningFailureException.class,
() -> followIndex(TaskResultsService.TASK_INDEX, ".tasks-follower"));

final List<String> warnings = exception.getResponse().getWarnings();
boolean expectedWarningMessageFound = warnings.stream()
.anyMatch(warning ->
warning.startsWith("Following a system index [" + TaskResultsService.TASK_INDEX) &&
warning.endsWith("will not work in the next major version"));
assertThat(
"Expected to find a warning about following system indices but the warnings are: " + warnings,
expectedWarningMessageFound,
equalTo(true)
);
}
}

@Override
protected Settings restClientSettings() {
String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.CCR;
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
Expand Down Expand Up @@ -117,7 +118,7 @@ public void testCleanFollowedLeaderIndexUUIDs() throws Exception {

Metadata metadata = getFollowerCluster().clusterService().state().metadata();
String leaderIndexUUID = metadata.index("copy-logs-201901")
.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)
.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY)
.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
AutoFollowMetadata autoFollowMetadata = metadata.custom(AutoFollowMetadata.TYPE);
assertThat(autoFollowMetadata, notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;
import org.elasticsearch.xpack.core.ccr.CCR;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;

import java.io.IOException;
Expand Down Expand Up @@ -185,7 +186,7 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I
IndexMetadata leaderMetadata = leaderState.getState().metadata().index(leaderIndex);
IndexMetadata followerMetadata = followerState.getState().metadata().index(followerIndex);
assertEquals(leaderMetadata.getNumberOfShards(), followerMetadata.getNumberOfShards());
Map<String, String> ccrMetadata = followerMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
Map<String, String> ccrMetadata = followerMetadata.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY);
assertEquals(leaderIndex, ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY));
assertEquals(leaderMetadata.getIndexUUID(), ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY));
assertEquals("leader_cluster", ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, EnginePlugin, RepositoryPlugin, ClusterPlugin {

public static final String CCR_THREAD_POOL_NAME = "ccr";
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.CCR;
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;

Expand Down Expand Up @@ -555,7 +556,7 @@ private static boolean leaderIndexAlreadyFollowed(AutoFollowPattern autoFollowPa
// we should let the auto follower attempt to auto follow it, so it can fail later and
// it is then visible in the auto follow stats. For example a cluster can just happen to have
// an index with the same name as the new follower index.
Map<String, String> customData = indexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
Map<String, String> customData = indexMetadata.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY);
if (customData != null) {
String recordedLeaderIndexUUID = customData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
return leaderIndex.getUUID().equals(recordedLeaderIndexUUID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
import org.elasticsearch.xpack.core.ccr.CCR;
import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask;

import java.util.ArrayList;
Expand Down Expand Up @@ -521,7 +522,7 @@ private void logRetentionLeaseFailure(final String retentionLeaseId, final Throw

private String getLeaderShardHistoryUUID(ShardFollowTask params) {
IndexMetadata followIndexMetadata = clusterService.state().metadata().index(params.getFollowShardId().getIndex());
Map<String, String> ccrIndexMetadata = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
Map<String, String> ccrIndexMetadata = followIndexMetadata.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY);
String[] recordedLeaderShardHistoryUUIDs = extractLeaderShardHistoryUUIDs(ccrIndexMetadata);
return recordedLeaderShardHistoryUUIDs[params.getLeaderShardId().id()];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.CCR;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;
Expand Down Expand Up @@ -65,7 +66,7 @@ static List<FollowerInfo> getFollowInfos(List<String> concreteFollowerIndices, C

for (String index : concreteFollowerIndices) {
IndexMetadata indexMetadata = state.metadata().index(index);
Map<String, String> ccrCustomData = indexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
Map<String, String> ccrCustomData = indexMetadata.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY);
if (ccrCustomData != null) {
Optional<ShardFollowTask> result;
if (persistentTasks != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.ccr.CCR;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask;

Expand Down Expand Up @@ -56,7 +56,7 @@ protected void masterOperation(PauseFollowAction.Request request,
listener.onFailure(new IndexNotFoundException(request.getFollowIndex()));
return;
}
if (followerIMD.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) == null) {
if (followerIMD.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY) == null) {
listener.onFailure(new IllegalArgumentException("index [" + request.getFollowIndex() + "] is not a follower index"));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.license.LicenseUtils;
Expand All @@ -52,12 +55,14 @@
import java.util.Locale;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public final class TransportPutFollowAction
extends TransportMasterNodeAction<PutFollowAction.Request, PutFollowAction.Response> {

private static final Logger logger = LogManager.getLogger(TransportPutFollowAction.class);
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(TransportPutFollowAction.class);

private final IndexScopedSettings indexScopedSettings;
private final Client client;
Expand Down Expand Up @@ -135,6 +140,12 @@ private void createFollowerIndex(
return;
}

if (leaderIndexMetadata.isSystem()) {
deprecationLogger.deprecate(DeprecationCategory.INDICES,
"ccr_follow_system_indices",
"Following a system index " + leaderIndexMetadata.getIndex() + " will not work in the next major version"
);
}
final Settings replicatedRequestSettings = TransportResumeFollowAction.filter(request.getSettings());
if (replicatedRequestSettings.isEmpty() == false) {
final List<String> unknownKeys =
Expand Down Expand Up @@ -232,18 +243,21 @@ public void onFailure(Exception e) {
listener = originalListener;
}

final Supplier<ThreadContext.StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(true);
RestoreClusterStateListener.createAndRegisterListener(clusterService, response, listener.delegateFailure(
(delegatedListener, restoreSnapshotResponse) -> {
RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
if (restoreInfo == null) {
// If restoreInfo is null then it is possible there was a master failure during the
// restore.
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
} else if (restoreInfo.failedShards() == 0) {
initiateFollowing(clientWithHeaders, request, delegatedListener);
} else {
assert restoreInfo.failedShards() > 0 : "Should have failed shards";
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
try (ThreadContext.StoredContext ctx = contextSupplier.get()) {
RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
if (restoreInfo == null) {
// If restoreInfo is null then it is possible there was a master failure during the
// restore.
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
} else if (restoreInfo.failedShards() == 0) {
initiateFollowing(clientWithHeaders, request, delegatedListener);
} else {
assert restoreInfo.failedShards() > 0 : "Should have failed shards";
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
}
}
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ccr.CCR;
import org.elasticsearch.xpack.core.ccr.action.FollowParameters;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask;
Expand Down Expand Up @@ -121,7 +122,7 @@ protected void masterOperation(final ResumeFollowAction.Request request,
return;
}

final Map<String, String> ccrMetadata = followerIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final Map<String, String> ccrMetadata = followerIndexMetadata.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY);
if (ccrMetadata == null) {
throw new IllegalArgumentException("follow index ["+ request.getFollowerIndex() + "] does not have ccr metadata");
}
Expand Down Expand Up @@ -183,7 +184,7 @@ static void validate(
final MapperService followerMapperService) {
FollowParameters parameters = request.getParameters();

Map<String, String> ccrIndexMetadata = followIndex.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
Map<String, String> ccrIndexMetadata = followIndex.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY);
if (ccrIndexMetadata == null) {
throw new IllegalArgumentException("follow index ["+ followIndex.getIndex().getName() + "] does not have ccr metadata");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrRetentionLeases;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.CCR;
import org.elasticsearch.xpack.core.ccr.action.ShardFollowTask;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

Expand Down Expand Up @@ -95,7 +96,7 @@ public void onFailure(final String source, final Exception e) {
@Override
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
final IndexMetadata indexMetadata = oldState.metadata().index(request.getFollowerIndex());
final Map<String, String> ccrCustomMetadata = indexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final Map<String, String> ccrCustomMetadata = indexMetadata.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY);
final String remoteClusterName = ccrCustomMetadata.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY);

final String leaderIndexName = ccrCustomMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
Expand Down Expand Up @@ -222,7 +223,7 @@ static ClusterState unfollow(String followerIndex, ClusterState current) {
throw new IndexNotFoundException(followerIndex);
}

if (followerIMD.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) == null) {
if (followerIMD.getCustomData(CCR.CCR_CUSTOM_METADATA_KEY) == null) {
throw new IllegalArgumentException("index [" + followerIndex + "] is not a follower index");
}

Expand Down Expand Up @@ -253,7 +254,7 @@ static ClusterState unfollow(String followerIndex, ClusterState current) {
newIndexMetadata.settings(builder);
newIndexMetadata.settingsVersion(followerIMD.getSettingsVersion() + 1);
// Remove ccr custom metadata
newIndexMetadata.removeCustom(Ccr.CCR_CUSTOM_METADATA_KEY);
newIndexMetadata.removeCustom(CCR.CCR_CUSTOM_METADATA_KEY);

Metadata newMetadata = Metadata.builder(current.metadata())
.put(newIndexMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.elasticsearch.xpack.ccr.action.repositories.GetCcrRestoreFileChunkRequest;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionRequest;
import org.elasticsearch.xpack.core.ccr.CCR;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -222,7 +223,7 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetadata.getIndexUUID());
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetadata.getIndex().getName());
metadata.put(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY, remoteClusterAlias);
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
imdBuilder.putCustom(CCR.CCR_CUSTOM_METADATA_KEY, metadata);

imdBuilder.settings(leaderIndexMetadata.getSettings());

Expand Down Expand Up @@ -343,7 +344,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh
// TODO: Add timeouts to network calls / the restore process.
createEmptyStore(store);

final Map<String, String> ccrMetadata = store.indexSettings().getIndexMetadata().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
final Map<String, String> ccrMetadata = store.indexSettings().getIndexMetadata().getCustomData(CCR.CCR_CUSTOM_METADATA_KEY);
final String leaderIndexName = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY);
final String leaderUUID = ccrMetadata.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
final Index leaderIndex = new Index(leaderIndexName, leaderUUID);
Expand Down
Loading

0 comments on commit 9b8adc0

Please sign in to comment.