Skip to content


Extract template management from Watcher (elastic#41169)
Browse files Browse the repository at this point in the history
This commit extracts the template management from Watcher into an
abstract class, so that templates and lifecycle policies can be managed
in the same way across multiple plugins. This will be useful for SLM, as
well as potentially ILM and any other plugins which need to manage index
  • Loading branch information
gwbrown authored and Gurkan Kaymak committed May 27, 2019
1 parent 0715e50 commit 27ed862
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 213 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.

package org.elasticsearch.xpack.core.template;

import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;

* Describes an index template to be loaded from a resource file for use with an {@link IndexTemplateRegistry}.
public class IndexTemplateConfig {

private final String templateName;
private final String fileName;
private final String version;
private final String versionProperty;

* Describes a template to be loaded from a resource file. Includes handling for substituting a version property into the template.
* The {@code versionProperty} parameter will be used to substitute the value of {@code version} into the template. For example,
* this template:
* {@code {"myTemplateVersion": "${}"}}
* With {@code version = "42"; versionProperty = ""} will result in {@code {"myTemplateVersion": "42"}}.
* @param templateName The name that will be used for the index template. Literal, include the version in this string if
* it should be used.
* @param fileName The filename the template should be loaded from. Literal, should include leading {@literal /} and
* extension if necessary.
* @param version The version of the template. Substituted for {@code versionProperty} as described above.
* @param versionProperty The property that will be replaced with the {@code version} string as described above.
public IndexTemplateConfig(String templateName, String fileName, String version, String versionProperty) {
this.templateName = templateName;
this.fileName = fileName;
this.version = version;
this.versionProperty = versionProperty;

public String getFileName() {
return fileName;

public String getTemplateName() {
return templateName;

* Loads the template from disk as a UTF-8 byte array.
* @return The template as a UTF-8 byte array.
public byte[] loadBytes() {
String template = TemplateUtils.loadTemplate(fileName, version,
Pattern.quote("${" + versionProperty + "}"));
assert template != null && template.length() > 0;
return template.getBytes(StandardCharsets.UTF_8);
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.

package org.elasticsearch.xpack.core.template;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.action.PutLifecycleAction;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

* Abstracts the logic of managing versioned index templates and lifecycle policies for plugins that require such things.
public abstract class IndexTemplateRegistry implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(IndexTemplateRegistry.class);

protected final Settings settings;
protected final Client client;
protected final ThreadPool threadPool;
protected final NamedXContentRegistry xContentRegistry;
protected final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
protected final ConcurrentMap<String, AtomicBoolean> policyCreationsInProgress = new ConcurrentHashMap<>();

public IndexTemplateRegistry(Settings nodeSettings, ClusterService clusterService, ThreadPool threadPool, Client client,
NamedXContentRegistry xContentRegistry) {
this.settings = nodeSettings;
this.client = client;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;

* Retrieves return a list of {@link IndexTemplateConfig} that represents
* the index templates that should be installed and managed.
* @return The configurations for the templates that should be installed.
protected abstract List<IndexTemplateConfig> getTemplateConfigs();

* Retrieves a list of {@link LifecyclePolicyConfig} that represents the ILM
* policies that should be installed and managed. Only called if ILM is enabled.
* @return The configurations for the lifecycle policies that should be installed.
protected abstract List<LifecyclePolicyConfig> getPolicyConfigs();

* Retrieves an identifier that is used to identify which plugin is asking for this.
* @return A string ID for the plugin managing these templates.
protected abstract String getOrigin();

* Called when creation of an index template fails.
* @param config The template config that failed to be created.
* @param e The exception that caused the failure.
protected void onPutTemplateFailure(IndexTemplateConfig config, Exception e) {
logger.error(new ParameterizedMessage("error adding index template [{}] from [{}] for [{}]",
config.getTemplateName(), config.getFileName(), getOrigin()), e);

* Called when creation of a lifecycle policy fails.
* @param policy The lifecycle policy that failed to be created.
* @param e The exception that caused the failure.
protected void onPutPolicyFailure(LifecyclePolicy policy, Exception e) {
logger.error(new ParameterizedMessage("error adding lifecycle policy [{}] for [{}]",
policy.getName(), getOrigin()), e);

public void clusterChanged(ClusterChangedEvent event) {
ClusterState state = event.state();
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// wait until the gateway has recovered from disk, otherwise we think may not have the index templates,
// while they actually do exist

// no master node, exit immediately
DiscoveryNode masterNode = event.state().getNodes().getMasterNode();
if (masterNode == null) {

// if this node is newer than the master node, we probably need to add the template, which might be newer than the
// template the master node has, so we need potentially add new templates despite being not the master node
DiscoveryNode localNode = event.state().getNodes().getLocalNode();
boolean localNodeVersionAfterMaster = localNode.getVersion().after(masterNode.getVersion());

if (event.localNodeMaster() || localNodeVersionAfterMaster) {

private void addTemplatesIfMissing(ClusterState state) {
final List<IndexTemplateConfig> indexTemplates = getTemplateConfigs();
for (IndexTemplateConfig template : indexTemplates) {
final String templateName = template.getTemplateName();
final AtomicBoolean creationCheck = templateCreationsInProgress.computeIfAbsent(templateName, key -> new AtomicBoolean(false));
if (creationCheck.compareAndSet(false, true)) {
if (!state.metaData().getTemplates().containsKey(templateName)) {
logger.debug("adding index template [{}] for [{}], because it doesn't exist", templateName, getOrigin());
putTemplate(template, creationCheck);
} else {
logger.trace("not adding index template [{}] for [{}], because it already exists", templateName, getOrigin());

private void putTemplate(final IndexTemplateConfig config, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
final String templateName = config.getTemplateName();

PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(config.loadBytes(), XContentType.JSON);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), getOrigin(), request,
new ActionListener<AcknowledgedResponse>() {
public void onResponse(AcknowledgedResponse response) {
if (response.isAcknowledged() == false) {
logger.error("error adding index template [{}] for [{}], request was not acknowledged",
templateName, getOrigin());

public void onFailure(Exception e) {
onPutTemplateFailure(config, e);
}, client.admin().indices()::putTemplate);

private void addIndexLifecyclePoliciesIfMissing(ClusterState state) {
boolean ilmSupported = XPackSettings.INDEX_LIFECYCLE_ENABLED.get(settings);

if (ilmSupported) {
Optional<IndexLifecycleMetadata> maybeMeta = Optional.ofNullable(state.metaData().custom(IndexLifecycleMetadata.TYPE));
List<LifecyclePolicy> policies = getPolicyConfigs().stream()
.map(policyConfig -> policyConfig.load(xContentRegistry))

for (LifecyclePolicy policy : policies) {
final AtomicBoolean creationCheck = policyCreationsInProgress.computeIfAbsent(policy.getName(),
key -> new AtomicBoolean(false));
if (creationCheck.compareAndSet(false, true)) {
final boolean policyNeedsToBeCreated = maybeMeta
.flatMap(ilmMeta -> Optional.ofNullable(ilmMeta.getPolicies().get(policy.getName())))
.isPresent() == false;
if (policyNeedsToBeCreated) {
logger.debug("adding lifecycle policy [{}] for [{}], because it doesn't exist", policy.getName(), getOrigin());
putPolicy(policy, creationCheck);
} else {
logger.trace("not adding lifecycle policy [{}] for [{}], because it already exists",
policy.getName(), getOrigin());

private void putPolicy(final LifecyclePolicy policy, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic();
executor.execute(() -> {
PutLifecycleAction.Request request = new PutLifecycleAction.Request(policy);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), getOrigin(), request,
new ActionListener<PutLifecycleAction.Response>() {
public void onResponse(PutLifecycleAction.Response response) {
if (response.isAcknowledged() == false) {
logger.error("error adding lifecycle policy [{}] for [{}], request was not acknowledged",
policy.getName(), getOrigin());

public void onFailure(Exception e) {
onPutPolicyFailure(policy, e);
}, (req, listener) -> new XPackClient(client).ilmClient().putLifecyclePolicy(req, listener));

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.

package org.elasticsearch.xpack.core.template;

import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicyUtils;

* Describes an index lifecycle policy to be loaded from a resource file for use with an {@link IndexTemplateRegistry}.
public class LifecyclePolicyConfig {

private final String policyName;
private final String fileName;

* Describes a lifecycle policy definition to be loaded from a resource file.
* @param policyName The name that will be used for the policy.
* @param fileName The filename the policy definition should be loaded from. Literal, should include leading {@literal /} and
* extension if necessary.
public LifecyclePolicyConfig(String policyName, String fileName) {
this.policyName = policyName;
this.fileName = fileName;

public String getPolicyName() {
return policyName;

public String getFileName() {
return fileName;

public LifecyclePolicy load(NamedXContentRegistry xContentRegistry) {
return LifecyclePolicyUtils.loadPolicy(policyName, fileName, xContentRegistry);

0 comments on commit 27ed862

Please sign in to comment.