Skip to content

Commit

Permalink
Prevent snapshot backed indices to be followed using CCR (#70580)
Browse files Browse the repository at this point in the history
Today nothing prevents CCR's auto-follow patterns to pick 
up snapshot backed indices on a remote cluster. This can 
lead to various errors on the follower cluster that are not 
obvious to troubleshoot for a user (ex: multiple engine 
factories provided).

This commit adds verifications to CCR to make it fail faster 
when a user tries to follow an index that is backed by a 
snapshot, providing a more obvious error message.
  • Loading branch information
tlrx authored Mar 24, 2021
1 parent 296ac1a commit efa6aea
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
Expand Down Expand Up @@ -1229,8 +1230,12 @@ protected static void createIndex(String name, Settings settings, String mapping
}

protected static void deleteIndex(String name) throws IOException {
deleteIndex(client(), name);
}

protected static void deleteIndex(RestClient client, String name) throws IOException {
Request request = new Request("DELETE", "/" + name);
client().performRequest(request);
client.performRequest(request);
}

protected static void updateIndexSettings(String index, Settings.Builder settings) throws IOException {
Expand Down Expand Up @@ -1333,19 +1338,38 @@ protected static Map<String, Object> responseAsMap(Response response) throws IOE
}

protected static void registerRepository(String repository, String type, boolean verify, Settings settings) throws IOException {
registerRepository(client(), repository, type, verify, settings);
}

protected static void registerRepository(
RestClient client,
String repository,
String type,
boolean verify,
Settings settings
) throws IOException {
final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository);
request.addParameter("verify", Boolean.toString(verify));
request.setJsonEntity(Strings.toString(new PutRepositoryRequest(repository).type(type).settings(settings)));

final Response response = client().performRequest(request);
final Response response = client.performRequest(request);
assertAcked("Failed to create repository [" + repository + "] of type [" + type + "]: " + response, response);
}

protected static void createSnapshot(String repository, String snapshot, boolean waitForCompletion) throws IOException {
createSnapshot(client(), repository, snapshot, waitForCompletion);
}

protected static void createSnapshot(
RestClient client,
String repository,
String snapshot,
boolean waitForCompletion
) throws IOException {
final Request request = new Request(HttpPut.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot);
request.addParameter("wait_for_completion", Boolean.toString(waitForCompletion));

final Response response = client().performRequest(request);
final Response response = client.performRequest(request);
assertThat(
"Failed to create snapshot [" + snapshot + "] in repository [" + repository + "]: " + response,
response.getStatusLine().getStatusCode(),
Expand All @@ -1365,6 +1389,19 @@ protected static void restoreSnapshot(String repository, String snapshot, boolea
);
}

protected static void deleteSnapshot(String repository, String snapshot, boolean ignoreMissing) throws IOException {
deleteSnapshot(client(), repository, snapshot, ignoreMissing);
}

protected static void deleteSnapshot(RestClient client, String repository, String snapshot, boolean ignoreMissing) throws IOException {
final Request request = new Request(HttpDelete.METHOD_NAME, "_snapshot/" + repository + '/' + snapshot);
if (ignoreMissing) {
request.addParameter("ignore", "404");
}
final Response response = client.performRequest(request);
assertThat(response.getStatusLine().getStatusCode(), ignoreMissing ? anyOf(equalTo(200), equalTo(404)) : equalTo(200));
}

@SuppressWarnings("unchecked")
private static void assertAcked(String message, Response response) throws IOException {
final int responseStatusCode = response.getStatusLine().getStatusCode();
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugin/ccr/qa/multi-cluster/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ testClusters {
'leader-cluster' {
testDistribution = 'DEFAULT'
setting 'xpack.license.self_generated.type', 'trial'
setting 'path.repo', "${buildDir}/cluster/shared/repo/leader-cluster"
}
'middle-cluster' {
testDistribution = 'DEFAULT'
Expand All @@ -25,12 +26,14 @@ testClusters {
tasks.register("leader-cluster", RestIntegTestTask) {
mustRunAfter("precommit")
systemProperty 'tests.target_cluster', 'leader'
systemProperty 'tests.leader_cluster_repository_path', "${buildDir}/cluster/shared/repo/leader-cluster"
}

tasks.register("middle-cluster", RestIntegTestTask) {
dependsOn "leader-cluster"
useCluster testClusters."leader-cluster"
systemProperty 'tests.target_cluster', 'middle'
systemProperty 'tests.leader_cluster_repository_path', "${buildDir}/cluster/shared/repo/leader-cluster"
nonInputProperties.systemProperty 'tests.leader_host',
"${-> testClusters.named('leader-cluster').get().getAllHttpSocketURI().get(0)}"
}
Expand All @@ -41,6 +44,7 @@ tasks.register('follow-cluster', RestIntegTestTask) {
useCluster testClusters."leader-cluster"
useCluster testClusters."middle-cluster"
systemProperty 'tests.target_cluster', 'follow'
systemProperty 'tests.leader_cluster_repository_path', "${buildDir}/cluster/shared/repo/leader-cluster"
nonInputProperties.systemProperty 'tests.leader_host',
"${-> testClusters.named('leader-cluster').get().getAllHttpSocketURI().get(0)}"
nonInputProperties.systemProperty 'tests.middle_host',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@

package org.elasticsearch.xpack.ccr;

import org.apache.http.client.methods.HttpPost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectPath;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
import java.text.SimpleDateFormat;
Expand All @@ -23,12 +26,17 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.common.xcontent.ObjectPath.eval;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;

public class AutoFollowIT extends ESCCRRestTestCase {
Expand All @@ -37,7 +45,7 @@ public class AutoFollowIT extends ESCCRRestTestCase {

public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {
if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" );
logger.info("skipping test, waiting for target cluster [follow]");
return;
}

Expand Down Expand Up @@ -79,7 +87,7 @@ public void testMultipleAutoFollowPatternsDifferentClusters() throws Exception {

public void testAutoFollowPatterns() throws Exception {
if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" );
logger.info("skipping test, waiting for target cluster [follow]");
return;
}

Expand Down Expand Up @@ -138,7 +146,7 @@ public void testAutoFollowPatterns() throws Exception {

public void testPutAutoFollowPatternThatOverridesRequiredLeaderSetting() throws IOException {
if ("follow".equals(targetCluster) == false) {
logger.info("skipping test, waiting for target cluster [follow]" );
logger.info("skipping test, waiting for target cluster [follow]");
return;
}

Expand Down Expand Up @@ -211,7 +219,7 @@ public void testDataStreams() throws Exception {
// First rollover and ensure second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));

Expand All @@ -232,7 +240,7 @@ public void testDataStreams() throws Exception {
// Second rollover and ensure third backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2),
backingIndexName(dataStreamName, 3));
Expand Down Expand Up @@ -284,7 +292,7 @@ public void testDataStreams_autoFollowAfterDataStreamCreated() throws Exception
// Rollover and ensure only second backing index is replicated:
{
try (RestClient leaderClient = buildLeaderClient()) {
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
Request rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));

Expand Down Expand Up @@ -354,7 +362,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception {
// Rollover in leader cluster and ensure second backing index is replicated:
{
try (var leaderClient = buildLeaderClient()) {
var rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
var rolloverRequest = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyDataStream(leaderClient, dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2));

Expand All @@ -374,7 +382,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception {

// Try rollover in follow cluster
{
var rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover");
var rolloverRequest1 = new Request("POST", "/" + dataStreamName + "/_rollover");
var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1));
assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " +
"because it is a replicated data stream"));
Expand All @@ -386,7 +394,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception {
unfollow(backingIndexName(dataStreamName, 1));

// Try again
var rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover");
var rolloverRequest2 = new Request("POST", "/" + dataStreamName + "/_rollover");
e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest2));
assertThat(e.getMessage(), containsString("data stream [" + dataStreamName + "] cannot be rolled over, " +
"because it is a replicated data stream"));
Expand All @@ -397,7 +405,7 @@ public void testRolloverDataStreamInFollowClusterForbidden() throws Exception {
assertOK(client().performRequest(promoteRequest));

// Try again and now the rollover should be successful because local data stream is now :
var rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover");
var rolloverRequest3 = new Request("POST", "/" + dataStreamName + "/_rollover");
assertOK(client().performRequest(rolloverRequest3));
verifyDataStream(client(), dataStreamName, backingIndexName(dataStreamName, 1), backingIndexName(dataStreamName, 2),
backingIndexName(dataStreamName, 3));
Expand Down Expand Up @@ -458,7 +466,7 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception {
// Rollover in leader cluster and ensure second backing index is replicated:
{
try (var leaderClient = buildLeaderClient()) {
var rolloverRequest = new Request("POST", "/" + aliasName + "/_rollover");
var rolloverRequest = new Request("POST", "/" + aliasName + "/_rollover");
assertOK(leaderClient.performRequest(rolloverRequest));
verifyAlias(leaderClient, aliasName, true, aliasName + "-000002", aliasName + "-000001");

Expand All @@ -479,7 +487,7 @@ public void testRolloverAliasInFollowClusterForbidden() throws Exception {
// Try rollover in follow cluster, this should fail, because is_write_index property of an alias isn't
// replicated to follow cluster.
{
var rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover");
var rolloverRequest1 = new Request("POST", "/" + aliasName + "/_rollover");
var e = expectThrows(ResponseException.class, () -> client().performRequest(rolloverRequest1));
assertThat(e.getMessage(), containsString("rollover target [" + aliasName + "] does not point to a write index"));
verifyAlias(client(), aliasName, false, aliasName + "-000002", aliasName + "-000001");
Expand Down Expand Up @@ -663,6 +671,82 @@ public void testDataStreamsBiDirectionalReplication() throws Exception {
}
}

public void testAutoFollowSearchableSnapshotsFails() throws Exception {
if ("follow".equals(targetCluster) == false) {
return;
}

final String testPrefix = getTestName().toLowerCase(Locale.ROOT);
int initialNumberOfSuccessfulFollowedIndicesInFollowCluster = getNumberOfSuccessfulFollowedIndices();

final String autoFollowPattern = "pattern-" + testPrefix;
createAutoFollowPattern(client(), autoFollowPattern, testPrefix + "-*", "leader_cluster");

// Create a regular index on leader
final String regularIndex = testPrefix + "-regular";
{
try (var leaderClient = buildLeaderClient()) {
for (int i = 0; i < 10; i++) {
var indexRequest = new Request("POST", "/" + regularIndex + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"value\":" + i + "}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDocuments(leaderClient, regularIndex, 10);
}
}

// Create a snapshot backed index on leader
final String mountedIndex = testPrefix + "-mounted";
{
try (var leaderClient = buildLeaderClient()) {
final String systemPropertyRepoPath = System.getProperty("tests.leader_cluster_repository_path");
assertThat("Missing system property [tests.leader_cluster_repository_path]",
systemPropertyRepoPath, not(emptyOrNullString()));
final String repositoryPath = systemPropertyRepoPath + '/' + testPrefix;

final String repository = testPrefix + "-repository";
registerRepository(leaderClient, repository, "fs", true, Settings.builder().put("location", repositoryPath).build());

final String indexName = testPrefix + "-index";
for (int i = 0; i < 5; i++) {
var indexRequest = new Request("POST", "/" + indexName + "/_doc");
indexRequest.addParameter("refresh", "true");
indexRequest.setJsonEntity("{\"value\":" + i + "}");
assertOK(leaderClient.performRequest(indexRequest));
}
verifyDocuments(leaderClient, indexName, 5);

final String snapshot = testPrefix + "-snapshot";
deleteSnapshot(leaderClient, repository, snapshot, true);
createSnapshot(leaderClient, repository, snapshot, true);
deleteIndex(leaderClient, indexName);

final Request mountRequest = new Request(HttpPost.METHOD_NAME, "/_snapshot/" + repository + '/' + snapshot + "/_mount");
mountRequest.setJsonEntity("{\"index\": \"" + indexName + "\",\"renamed_index\": \"" + mountedIndex + "\"}");
final Response mountResponse = leaderClient.performRequest(mountRequest);
assertThat(mountResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus()));
ensureYellow(mountedIndex, leaderClient);
}
}

assertBusy(() -> {
Request statsRequest = new Request("GET", "/_ccr/stats");
Map<?, ?> response = toMap(client().performRequest(statsRequest));
assertThat(eval("auto_follow_stats.number_of_successful_follow_indices", response),
equalTo(initialNumberOfSuccessfulFollowedIndicesInFollowCluster + 2));
assertThat(eval("auto_follow_stats.recent_auto_follow_errors", response),
hasSize(greaterThan(0)));
assertThat(eval("auto_follow_stats.recent_auto_follow_errors.0.auto_follow_exception.reason", response),
containsString("index to follow [" + mountedIndex + "] is a searchable snapshot index and cannot be used " +
"for cross-cluster replication purpose"));
ensureYellow(regularIndex);
verifyDocuments(client(), regularIndex, 10);
});

deleteAutoFollowPattern(client(), autoFollowPattern);
}

private int getNumberOfSuccessfulFollowedIndices() throws IOException {
return getNumberOfSuccessfulFollowedIndices(client());
}
Expand Down
Loading

0 comments on commit efa6aea

Please sign in to comment.