Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid auto following leader system indices in CCR #72815

Merged
merged 10 commits into from
Jul 9, 2021
1 change: 1 addition & 0 deletions docs/reference/migration/migrate_8_0.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ include::migrate_8_0/aggregations.asciidoc[]
include::migrate_8_0/allocation.asciidoc[]
include::migrate_8_0/analysis.asciidoc[]
include::migrate_8_0/breaker.asciidoc[]
include::migrate_8_0/ccr.asciidoc[]
include::migrate_8_0/cluster.asciidoc[]
include::migrate_8_0/discovery.asciidoc[]
include::migrate_8_0/eql.asciidoc[]
Expand Down
22 changes: 22 additions & 0 deletions docs/reference/migration/migrate_8_0/ccr.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[discrete]
[[breaking_80_ccr_changes]]
==== Cross Cluster Replication changes

//NOTE: The notable-breaking-changes tagged regions are re-used in the
//Installation and Upgrade Guide

//tag::notable-breaking-changes[]

// end::notable-breaking-changes[]

.Remote system indices are not followed automatically if they match an auto-follow pattern.
[%collapsible]
====
*Details* +
Remote system indices matching an <<ccr-auto-follow,auto-follow pattern>>
won't be configured as a follower index automatically.

*Impact* +
Explicitly <<ccr-put-follow,create a follower index>> to follow a remote system
index if that's the wanted behaviour.
====
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SystemIndexPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.CcrIntegTestCase;
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
Expand All @@ -35,14 +38,17 @@
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand All @@ -60,6 +66,30 @@ protected boolean reuseClusters() {
return false;
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Stream.concat(super.nodePlugins().stream(), Stream.of(FakeSystemIndex.class)).collect(Collectors.toList());
}

public static class FakeSystemIndex extends Plugin implements SystemIndexPlugin {
public static final String SYSTEM_INDEX_NAME = ".fake-system-index";

@Override
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
return Collections.singletonList(new SystemIndexDescriptor(SYSTEM_INDEX_NAME, "test index"));
}

@Override
public String getFeatureName() {
return "fake system index";
}

@Override
public String getFeatureDescription() {
return "fake system index";
}
}

public void testAutoFollow() throws Exception {
Settings leaderIndexSettings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
Expand Down Expand Up @@ -93,6 +123,28 @@ public void testAutoFollow() throws Exception {
assertFalse(ESIntegTestCase.indexExists("copy-logs-201812", followerClient()));
}

public void testAutoFollowDoNotFollowSystemIndices() throws Exception {
putAutoFollowPatterns("my-pattern", new String[] {".*", "logs-*"});

// Trigger system index creation
leaderClient().prepareIndex(FakeSystemIndex.SYSTEM_INDEX_NAME)
.setSource(Map.of("a", "b"))
.execute()
.actionGet();

Settings leaderIndexSettings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
.build();
createLeaderIndex("logs-201901", leaderIndexSettings);
assertLongBusy(() -> {
AutoFollowStats autoFollowStats = getAutoFollowStats();
assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L));
assertTrue(ESIntegTestCase.indexExists("copy-logs-201901", followerClient()));
assertFalse(ESIntegTestCase.indexExists("copy-.fake-system-index", followerClient()));
});
}

public void testCleanFollowedLeaderIndexUUIDs() throws Exception {
Settings leaderIndexSettings = Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
Expand Down Expand Up @@ -71,6 +71,7 @@
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices;
import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -2150,11 +2151,115 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> pa
}
}

public void testSystemIndicesAreNotAutoFollowed() {
ClusterState clusterState = null;
final int nbLeaderSystemIndices = randomIntBetween(1, 15);
for (int i = 0; i < nbLeaderSystemIndices; i++) {
String indexName = "." + i;
if (clusterState == null) {
clusterState = createRemoteClusterState(indexName, true, 0, true);
} else {
clusterState = createRemoteClusterState(clusterState, true, indexName);
}
}
final List<AutoFollowCoordinator.AutoFollowResult> autoFollowResults = executeAutoFollow(".*", clusterState);
assertThat(autoFollowResults.size(), equalTo(1));
assertThat(autoFollowResults.get(0).autoFollowExecutionResults, is(anEmptyMap()));
}

public void testSystemDataStreamsAreNotAutoFollowed() {
final List<AutoFollowCoordinator.AutoFollowResult> autoFollowResults =
executeAutoFollow("*.", createRemoteClusterStateWithDataStream(".test-data-stream"));

assertThat(autoFollowResults.size(), equalTo(1));
assertThat(autoFollowResults.get(0).autoFollowExecutionResults, is(anEmptyMap()));
}

private List<AutoFollowCoordinator.AutoFollowResult> executeAutoFollow(String indexPattern, final ClusterState finalRemoteState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to add a single test that also uses this method, that actually creates an index. It is sort of a test of the test, but given the complexity of this test method and the fact that if it just returns nothing, we are good, I think it is worthwhile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in d8a7c41

final Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString())).thenReturn(client);

final String pattern = "pattern1";
final ClusterState localState = ClusterState.builder(new ClusterName("local"))
.metadata(Metadata.builder()
.putCustom(AutoFollowMetadata.TYPE,
new AutoFollowMetadata(
Map.of(
pattern,
new AutoFollowPattern(
"remote",
List.of(indexPattern),
Collections.emptyList(),
null,
Settings.EMPTY,
true,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
),
Map.of(pattern, List.of()),
Map.of(pattern, Map.of()))))
.build();

final AtomicReference<ClusterState> lastModifiedClusterState = new AtomicReference<>(localState);
final List<AutoFollowCoordinator.AutoFollowResult> results = new ArrayList<>();
final Set<Object> followedIndices = ConcurrentCollections.newConcurrentSet();
final AutoFollower autoFollower =
new AutoFollower("remote", results::addAll, localClusterStateSupplier(localState), () -> 1L, Runnable::run) {
@Override
void getRemoteClusterState(String remoteCluster,
long metadataVersion,
BiConsumer<ClusterStateResponse, Exception> handler) {
assertThat(remoteCluster, equalTo("remote"));
handler.accept(new ClusterStateResponse(new ClusterName("remote"), finalRemoteState, false), null);
}

@Override
void createAndFollow(Map<String, String> headers,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler) {
followedIndices.add(followRequest.getLeaderIndex());
successHandler.run();
}

@Override
void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunction, Consumer<Exception> handler) {
lastModifiedClusterState.updateAndGet(updateFunction::apply);
handler.accept(null);
}

@Override
void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List<String> patterns) {
// Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
}
};
autoFollower.start();

assertThat(results, notNullValue());
return results;
}

private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes) {
return createRemoteClusterState(indexName, enableSoftDeletes, 0L);
}

private static ClusterState createRemoteClusterState(String indexName, boolean enableSoftDeletes, long metadataVersion) {
return createRemoteClusterState(indexName, enableSoftDeletes, metadataVersion, false);
}

private static ClusterState createRemoteClusterState(String indexName,
boolean enableSoftDeletes,
long metadataVersion,
boolean systemIndex) {
Settings.Builder indexSettings;
if (enableSoftDeletes == false) {
indexSettings = settings(VersionUtils.randomPreviousCompatibleVersion(random(), Version.V_8_0_0))
Expand All @@ -2168,6 +2273,7 @@ private static ClusterState createRemoteClusterState(String indexName, boolean e
.settings(indexSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.system(systemIndex)
.build();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder()
Expand All @@ -2180,7 +2286,12 @@ private static ClusterState createRemoteClusterState(String indexName, boolean e
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
}


private static ClusterState createRemoteClusterState(final ClusterState previous, final String... indices) {
return createRemoteClusterState(previous, false, indices);
}

private static ClusterState createRemoteClusterState(final ClusterState previous, boolean systemIndices, final String... indices) {
if (indices == null) {
return previous;
}
Expand All @@ -2192,6 +2303,7 @@ private static ClusterState createRemoteClusterState(final ClusterState previous
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random())))
.numberOfShards(1)
.numberOfReplicas(0)
.system(systemIndices)
.build();
metadataBuilder.put(indexMetadata, true);
routingTableBuilder.add(IndexRoutingTable.builder(indexMetadata.getIndex())
Expand Down Expand Up @@ -2230,6 +2342,10 @@ private ClusterService mockClusterService() {
}

private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
return createRemoteClusterStateWithDataStream(dataStreamName, false);
}

private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
Settings.Builder indexSettings = settings(Version.CURRENT);
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
indexSettings.put("index.hidden", true);
Expand All @@ -2238,9 +2354,10 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
.settings(indexSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.system(system)
.build();
DataStream dataStream = new DataStream(dataStreamName, new DataStream.TimestampField("@timestamp"),
List.of(indexMetadata.getIndex()));
List.of(indexMetadata.getIndex()), 1, null, false, false, system);
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder()
.put(indexMetadata, true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,16 @@ public boolean match(IndexAbstraction indexAbstraction) {
public static boolean match(List<String> leaderIndexPatterns,
List<String> leaderIndexExclusionPatterns,
IndexAbstraction indexAbstraction) {
boolean matches = Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getName()) == false &&
Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getName());
boolean matches = indexAbstraction.isSystem() == false &&
Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getName()) == false &&
Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getName());

if (matches) {
return true;
} else {
return indexAbstraction.getParentDataStream() != null &&
final IndexAbstraction.DataStream parentDataStream = indexAbstraction.getParentDataStream();
return parentDataStream != null &&
parentDataStream.isSystem() == false &&
Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getParentDataStream().getName()) == false &&
Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
}
Expand Down