Skip to content

Commit

Permalink
Migrate watcher to system indices infrastructure (#67588) (#69328)
Browse files Browse the repository at this point in the history
Backport of #67588. Part of #61656.

Migrate the `.watches` and `.triggered_watches` system indices to use
the auto-create infrastructure. The watcher history indices are left
alone.

As part of this work, a `SystemIndexDescriptor` now inspects its
mappings to determine whether it has any dynamic mappings. This
influences how strict Elasticsearch is with enforcing the descriptor's
mappings, since ES cannot know in advanced what all the mappings will
be.

This PR also fixes the `SystemIndexManager` so that (1) it doesn't fall
over when attempting to inspect the state of an index that hasn't been
created yet, and (2) does update mappings if there's no version field in
the mapping metadata.
  • Loading branch information
pugnascotia authored Feb 23, 2021
1 parent 4368b12 commit b9f0814
Show file tree
Hide file tree
Showing 15 changed files with 336 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static String checkForSystemIndexViolations(SystemIndices systemIndices, Index[]

for (Index index : concreteIndices) {
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(index.getName());
if (descriptor != null && descriptor.isAutomaticallyManaged()) {
if (descriptor != null && descriptor.isAutomaticallyManaged() && descriptor.hasDynamicMappings() == false) {
final String descriptorMappings = descriptor.getMappings();
// Technically we could trip over a difference in whitespace here, but then again nobody should be trying to manually
// update a descriptor's mappings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

/**
Expand Down Expand Up @@ -75,6 +78,9 @@ public class SystemIndexDescriptor {
/** The minimum cluster node version required for this descriptor, or null if there is no restriction */
private final Version minimumNodeVersion;

/** Whether there are dynamic fields in this descriptor's mappings */
private final boolean hasDynamicMappings;

/**
* Creates a descriptor for system indices matching the supplied pattern. These indices will not be managed
* by Elasticsearch internally.
Expand Down Expand Up @@ -176,8 +182,12 @@ public SystemIndexDescriptor(String indexPattern, String description) {
this.origin = origin;
this.indexType = indexType;
this.minimumNodeVersion = minimumNodeVersion;

this.hasDynamicMappings = this.mappings != null
&& findDynamicMapping(XContentHelper.convertToMap(JsonXContent.jsonXContent, mappings, false));
}


/**
* @return The pattern of index names that this descriptor will be used for.
*/
Expand Down Expand Up @@ -266,6 +276,10 @@ public String getIndexType() {
return indexType;
}

public boolean hasDynamicMappings() {
return this.hasDynamicMappings;
}

/**
* Checks that this descriptor can be used within this cluster e.g. the cluster supports all required
* features, by comparing the supplied minimum node version to this descriptor's minimum version.
Expand Down Expand Up @@ -436,4 +450,32 @@ private static String patternToRegex(String input) {
output = output.replaceAll("\\*", ".*");
return output;
}

/**
* Recursively searches for <code>dynamic: true</code> in the supplies mappings
* @param map a parsed fragment of an index's mappings
* @return whether the fragment contains a dynamic mapping
*/
@SuppressWarnings("unchecked")
static boolean findDynamicMapping(Map<String, Object> map) {
if (map == null) {
return false;
}

for (Map.Entry<String, Object> entry : map.entrySet()) {
final String key = entry.getKey();
final Object value = entry.getValue();
if (key.equals("dynamic") && (value instanceof Boolean) && ((Boolean) value)) {
return true;
}

if (value instanceof Map) {
if (findDynamicMapping((Map<String, Object>) value)) {
return true;
}
}
}

return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,6 @@

package org.elasticsearch.indices;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_FORMAT_SETTING;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -39,6 +32,13 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_FORMAT_SETTING;

/**
* This class ensures that all system indices have up-to-date mappings, provided
* those indices can be automatically managed. Only some system indices are managed
Expand All @@ -52,6 +52,11 @@ public class SystemIndexManager implements ClusterStateListener {
private final Client client;
private final AtomicBoolean isUpgradeInProgress;

/**
* Creates a new manager
* @param systemIndices the indices to manage
* @param client used to update the cluster
*/
public SystemIndexManager(SystemIndices systemIndices, Client client) {
this.systemIndices = systemIndices;
this.client = client;
Expand Down Expand Up @@ -138,6 +143,11 @@ UpgradeStatus getUpgradeStatus(ClusterState clusterState, SystemIndexDescriptor
// The messages below will be logged on every cluster state update, which is why even in the index closed / red
// cases, the log levels are DEBUG.

if (indexState == null) {
logger.debug("Index {} does not exist yet", indexDescription);
return UpgradeStatus.UP_TO_DATE;
}

if (indexState.indexState == IndexMetadata.State.CLOSE) {
logger.debug("Index {} is closed. This is likely to prevent some features from functioning correctly", indexDescription);
return UpgradeStatus.CLOSED;
Expand Down Expand Up @@ -199,10 +209,16 @@ public void onFailure(Exception e) {

/**
* Derives a summary of the current state of a system index, relative to the given cluster state.
* @param state the cluster state from which to derive the index state
* @param descriptor the system index to check
* @return a summary of the index state, or <code>null</code> if the index doesn't exist
*/
State calculateIndexState(ClusterState state, SystemIndexDescriptor descriptor) {
final IndexMetadata indexMetadata = state.metadata().index(descriptor.getPrimaryIndex());
assert indexMetadata != null;

if (indexMetadata == null) {
return null;
}

final boolean isIndexUpToDate = INDEX_FORMAT_SETTING.get(indexMetadata.getSettings()) == descriptor.getIndexFormat();

Expand Down Expand Up @@ -256,6 +272,8 @@ private Version readMappingVersion(SystemIndexDescriptor descriptor, MappingMeta
final String versionString = (String) meta.get(descriptor.getVersionMetaKey());
if (versionString == null) {
logger.warn("No value found in mappings for [_meta.{}]", descriptor.getVersionMetaKey());
// If we called `Version.fromString(null)`, it would return `Version.CURRENT` and we wouldn't update the mappings
return Version.V_EMPTY;
}
return Version.fromString(versionString);
} catch (ElasticsearchParseException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,21 @@

package org.elasticsearch.indices;

import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.ESTestCase;

import java.util.Map;

import static org.elasticsearch.indices.SystemIndexDescriptor.findDynamicMapping;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;

public class SystemIndexDescriptorTests extends ESTestCase {

/**
* Tests the various validation rules that are applied when creating a new system index descriptor.
*/
public void testValidation() {
{
Exception ex = expectThrows(NullPointerException.class,
Expand Down Expand Up @@ -75,4 +83,35 @@ public void testValidation() {
);
}
}

/**
* Check that a system index descriptor correctly identifies the presence of a dynamic mapping when once is present.
*/
public void testFindDynamicMappingsWithDynamicMapping() {
String json = "{"
+ " \"foo\": {"
+ " \"bar\": {"
+ " \"dynamic\": false"
+ " },"
+ " \"baz\": {"
+ " \"dynamic\": true"
+ " }"
+ " }"
+ "}";

final Map<String, Object> mappings = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, false);

assertThat(findDynamicMapping(mappings), equalTo(true));
}

/**
* Check that a system index descriptor correctly identifies the absence of a dynamic mapping when none are present.
*/
public void testFindDynamicMappingsWithoutDynamicMapping() {
String json = "{ \"foo\": { \"bar\": { \"dynamic\": false } } }";

final Map<String, Object> mappings = XContentHelper.convertToMap(JsonXContent.jsonXContent, json, false);

assertThat(findDynamicMapping(mappings), equalTo(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,19 @@ public void testManagerProcessesIndicesWithOutdatedMappings() {
assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE));
}

/**
* Check that the manager will try to upgrade indices where the version in the metadata is null or absent.
*/
public void testManagerProcessesIndicesWithNullVersionMetadata() {
SystemIndices systemIndices = new SystemIndices(org.elasticsearch.common.collect.Map.of("MyIndex", FEATURE));
SystemIndexManager manager = new SystemIndexManager(systemIndices, client);

final ClusterState.Builder clusterStateBuilder = createClusterState(Strings.toString(getMappings(null)));
markShardsAvailable(clusterStateBuilder);

assertThat(manager.getUpgradeStatus(clusterStateBuilder.build(), DESCRIPTOR), equalTo(UpgradeStatus.NEEDS_MAPPINGS_UPDATE));
}

/**
* Check that the manager submits the expected request for an index whose mappings are out-of-date.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class WatcherIndexTemplateRegistryField {
// version 11: watch history indices are hidden
// version 12: templates changed to composable templates
// version 13: add `allow_auto_create` setting
// version 14: remove watches and triggered watches, these are now system indices
// Note: if you change this, also inform the kibana team around the watcher-ui
public static final int INDEX_TEMPLATE_VERSION = 13;
public static final int INDEX_TEMPLATE_VERSION_10 = 10;
Expand All @@ -32,17 +33,11 @@ public final class WatcherIndexTemplateRegistryField {
public static final String HISTORY_TEMPLATE_NAME_NO_ILM_10 = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION_10;
public static final String HISTORY_TEMPLATE_NAME_NO_ILM_11 = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION_11;
public static final String HISTORY_TEMPLATE_NAME_NO_ILM_12 = ".watch-history-no-ilm-" + INDEX_TEMPLATE_VERSION_12;
public static final String TRIGGERED_TEMPLATE_NAME = ".triggered_watches";
public static final String TRIGGERED_TEMPLATE_NAME_11 = ".triggered_watches-11";
public static final String TRIGGERED_TEMPLATE_NAME_12 = ".triggered_watches-12";
public static final String WATCHES_TEMPLATE_NAME = ".watches";
public static final String WATCHES_TEMPLATE_NAME_11 = ".watches-11";
public static final String WATCHES_TEMPLATE_NAME_12 = ".watches-12";
public static final String[] TEMPLATE_NAMES = new String[] {
HISTORY_TEMPLATE_NAME, TRIGGERED_TEMPLATE_NAME, WATCHES_TEMPLATE_NAME
HISTORY_TEMPLATE_NAME
};
public static final String[] TEMPLATE_NAMES_NO_ILM = new String[] {
HISTORY_TEMPLATE_NAME_NO_ILM, TRIGGERED_TEMPLATE_NAME, WATCHES_TEMPLATE_NAME
HISTORY_TEMPLATE_NAME_NO_ILM
};

private WatcherIndexTemplateRegistryField() {}
Expand Down
46 changes: 0 additions & 46 deletions x-pack/plugin/core/src/main/resources/triggered-watches.json

This file was deleted.

68 changes: 0 additions & 68 deletions x-pack/plugin/core/src/main/resources/watches.json

This file was deleted.

Loading

0 comments on commit b9f0814

Please sign in to comment.