Skip to content

Commit

Permalink
Avoid auto following leader system indices in CCR (#72815)
Browse files Browse the repository at this point in the history
Relates #67686
  • Loading branch information
fcofdez authored Jul 9, 2021
1 parent 91a1591 commit abc7a47
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 7 deletions.
1 change: 1 addition & 0 deletions docs/reference/migration/migrate_8_0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ include::migrate_8_0/aggregations.asciidoc[]
include::migrate_8_0/allocation.asciidoc[]
include::migrate_8_0/analysis.asciidoc[]
include::migrate_8_0/breaker.asciidoc[]
include::migrate_8_0/ccr.asciidoc[]
include::migrate_8_0/cluster.asciidoc[]
include::migrate_8_0/discovery.asciidoc[]
include::migrate_8_0/eql.asciidoc[]
Expand Down
22 changes: 22 additions & 0 deletions docs/reference/migration/migrate_8_0/ccr.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[discrete]
[[breaking_80_ccr_changes]]
==== Cross Cluster Replication changes

//NOTE: The notable-breaking-changes tagged regions are re-used in the
//Installation and Upgrade Guide

//tag::notable-breaking-changes[]

// end::notable-breaking-changes[]

.Remote system indices are not followed automatically if they match an auto-follow pattern.
[%collapsible]
====
*Details* +
Remote system indices matching an <<ccr-auto-follow,auto-follow pattern>>
won't be configured as a follower index automatically.
*Impact* +
Explicitly <<ccr-put-follow,create a follower index>> to follow a remote system
index if that's the wanted behaviour.
====
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
Expand All @@ -35,14 +38,17 @@
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand All @@ -60,6 +66,30 @@ protected boolean reuseClusters() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(FakeSystemIndex.class)).collect(Collectors.toList());
}

public static class FakeSystemIndex extends Plugin implements SystemIndexPlugin {
public static final String SYSTEM_INDEX_NAME = ".fake-system-index";

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(new SystemIndexDescriptor(SYSTEM_INDEX_NAME, "test index"));
}

@Override
public String getFeatureName() {
return "fake system index";
}

@Override
public String getFeatureDescription() {
return "fake system index";
}
}

public void testAutoFollow() throws Exception {
Settings leaderIndexSettings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
Expand Down Expand Up @@ -93,6 +123,28 @@ public void testAutoFollow() throws Exception {
assertFalse(ESIntegTestCase.indexExists("copy-logs-201812", followerClient()));
}

public void testAutoFollowDoNotFollowSystemIndices() throws Exception {
putAutoFollowPatterns("my-pattern", new String[] {".*", "logs-*"});

// Trigger system index creation
leaderClient().prepareIndex(FakeSystemIndex.SYSTEM_INDEX_NAME)
.setSource(Map.of("a", "b"))
.execute()
.actionGet();

Settings leaderIndexSettings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build();
createLeaderIndex("logs-201901", leaderIndexSettings);
assertLongBusy(() -> {
AutoFollowStats autoFollowStats = getAutoFollowStats();
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L));
assertTrue(ESIntegTestCase.indexExists("copy-logs-201901", followerClient()));
assertFalse(ESIntegTestCase.indexExists("copy-.fake-system-index", followerClient()));
});
}

public void testCleanFollowedLeaderIndexUUIDs() throws Exception {
Settings leaderIndexSettings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
package org.elasticsearch.xpack.ccr.action;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;

import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand All @@ -24,13 +25,13 @@
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -71,6 +72,7 @@
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices;
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -2150,11 +2152,133 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> pa
}
}

public void testSystemIndicesAreNotAutoFollowed() {
ClusterState clusterState = null;
final int nbLeaderSystemIndices = randomIntBetween(1, 15);
for (int i = 0; i < nbLeaderSystemIndices; i++) {
String indexName = "." + i;
if (clusterState == null) {
clusterState = createRemoteClusterState(indexName, true, 0, true);
} else {
clusterState = createRemoteClusterState(clusterState, true, indexName);
}
}

Tuple<List<AutoFollowCoordinator.AutoFollowResult>, Set<String>> autoFollowResults = executeAutoFollow(".*", clusterState);
assertThat(autoFollowResults.v1().size(), equalTo(1));
assertThat(autoFollowResults.v1().get(0).autoFollowExecutionResults, is(anEmptyMap()));
assertThat(autoFollowResults.v2(), is(empty()));
}

public void testSystemDataStreamsAreNotAutoFollowed() {
Tuple<List<AutoFollowCoordinator.AutoFollowResult>, Set<String>> autoFollowResults =
executeAutoFollow("*.", createRemoteClusterStateWithDataStream(".test-data-stream"));

assertThat(autoFollowResults.v1().size(), equalTo(1));
assertThat(autoFollowResults.v1().get(0).autoFollowExecutionResults, is(anEmptyMap()));
assertThat(autoFollowResults.v2(), is(empty()));
}

public void testFollowerIndexIsCreatedInExecuteAutoFollow() {
final String indexName = "idx-1";
ClusterState clusterState = createRemoteClusterState(indexName, true, 0, false);

Tuple<List<AutoFollowCoordinator.AutoFollowResult>, Set<String>> autoFollowResults = executeAutoFollow("idx-*", clusterState);
assertThat(autoFollowResults.v1().size(), equalTo(1));
assertThat(autoFollowResults.v1().get(0).autoFollowExecutionResults.size(), equalTo(1));
for (Map.Entry<Index, Exception> autoFollowEntry : autoFollowResults.v1().get(0).autoFollowExecutionResults.entrySet()) {
assertThat(autoFollowEntry.getKey().getName(), equalTo(indexName));
assertThat(autoFollowEntry.getValue(), nullValue());
}
assertThat(autoFollowResults.v2().contains(indexName), equalTo(true));
}

private Tuple<List<AutoFollowCoordinator.AutoFollowResult>, Set<String>> executeAutoFollow(String indexPattern,
ClusterState finalRemoteState) {
final Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);

final String pattern = "pattern1";
final ClusterState localState = ClusterState.builder(new ClusterName("local"))
.metadata(Metadata.builder()
.putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(
Map.of(
pattern,
new AutoFollowPattern(
"remote",
List.of(indexPattern),
Collections.emptyList(),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
),
Map.of(pattern, List.of()),
Map.of(pattern, Map.of()))))
.build();

final AtomicReference<ClusterState> lastModifiedClusterState = new AtomicReference<>(localState);
final List<AutoFollowCoordinator.AutoFollowResult> results = new ArrayList<>();
final Set<String> followedIndices = ConcurrentCollections.newConcurrentSet();
final AutoFollower autoFollower =
new AutoFollower("remote", results::addAll, localClusterStateSupplier(localState), () -> 1L, Runnable::run) {
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler) {
assertThat(remoteCluster, equalTo("remote"));
handler.accept(new ClusterStateResponse(new ClusterName("remote"), finalRemoteState, false), null);
}

@Override
void createAndFollow(Map<String, String> headers,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
followedIndices.add(followRequest.getLeaderIndex());
successHandler.run();
}

@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
lastModifiedClusterState.updateAndGet(updateFunction::apply);
handler.accept(null);
}

@Override
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
// Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
}
};
autoFollower.start();

assertThat(results, notNullValue());
return Tuple.tuple(results, followedIndices);
}

private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes) {
return createRemoteClusterState(indexName, enableSoftDeletes, 0L);
}

private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes, long metadataVersion) {
return createRemoteClusterState(indexName, enableSoftDeletes, metadataVersion, false);
}

private static ClusterState createRemoteClusterState(String indexName,
boolean enableSoftDeletes,
long metadataVersion,
boolean systemIndex) {
Settings.Builder indexSettings;
if (enableSoftDeletes == false) {
indexSettings = settings(VersionUtils.randomPreviousCompatibleVersion(random(), Version.V_8_0_0))
Expand All @@ -2168,6 +2292,7 @@ private static ClusterState createRemoteClusterState(String indexName, boolean e
.settings(indexSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.system(systemIndex)
.build();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder()
Expand All @@ -2180,7 +2305,12 @@ private static ClusterState createRemoteClusterState(String indexName, boolean e
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
}


private static ClusterState createRemoteClusterState(final ClusterState previous, final String... indices) {
return createRemoteClusterState(previous, false, indices);
}

private static ClusterState createRemoteClusterState(final ClusterState previous, boolean systemIndices, final String... indices) {
if (indices == null) {
return previous;
}
Expand All @@ -2192,6 +2322,7 @@ private static ClusterState createRemoteClusterState(final ClusterState previous
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))
.numberOfShards(1)
.numberOfReplicas(0)
.system(systemIndices)
.build();
metadataBuilder.put(indexMetadata, true);
routingTableBuilder.add(IndexRoutingTable.builder(indexMetadata.getIndex())
Expand Down Expand Up @@ -2230,6 +2361,10 @@ private ClusterService mockClusterService() {
}

private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
return createRemoteClusterStateWithDataStream(dataStreamName, false);
}

private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
Settings.Builder indexSettings = settings(Version.CURRENT);
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
indexSettings.put("index.hidden", true);
Expand All @@ -2238,9 +2373,10 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
.settings(indexSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.system(system)
.build();
DataStream dataStream = new DataStream(dataStreamName, new DataStream.TimestampField("@timestamp"),
List.of(indexMetadata.getIndex()));
List.of(indexMetadata.getIndex()), 1, null, false, false, system);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder()
.put(indexMetadata, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,16 @@ public boolean match(IndexAbstraction indexAbstraction) {
public static boolean match(List<String> leaderIndexPatterns,
List<String> leaderIndexExclusionPatterns,
IndexAbstraction indexAbstraction) {
boolean matches = Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getName()) == false &&
Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getName());
boolean matches = indexAbstraction.isSystem() == false &&
Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getName()) == false &&
Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getName());

if (matches) {
return true;
} else {
return indexAbstraction.getParentDataStream() != null &&
final IndexAbstraction.DataStream parentDataStream = indexAbstraction.getParentDataStream();
return parentDataStream != null &&
parentDataStream.isSystem() == false &&
Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getParentDataStream().getName()) == false &&
Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
}
Expand Down

0 comments on commit abc7a47

Please sign in to comment.