Skip to content

Commit

Permalink
Refresh cached phase policy definition if possible on new poli… (#50820)
Browse files Browse the repository at this point in the history
* Refresh cached phase policy definition if possible on new policy

There are some cases when updating a policy does not change the
structure in a significant way. In these cases, we can reread the
policy definition for any indices using the updated policy.

This commit adds this refreshing to the `TransportPutLifecycleAction`
to allow this. It allows us to do things like change the configuration
values for a particular step, even when on that step (for example,
changing the rollover criteria while on the `check-rollover-ready` step).

There are more cases where the phase definition can be reread that just
the ones checked here (for example, removing an action that has already
been passed), and those will be added in subsequent work.

Relates to #48431
  • Loading branch information
dakrone authored Jan 13, 2020
1 parent d0a01a6 commit f53c968
Show file tree
Hide file tree
Showing 5 changed files with 756 additions and 11 deletions.
14 changes: 9 additions & 5 deletions docs/reference/ilm/policy-definitions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ PUT _ilm/policy/my_policy
}
--------------------------------------------------

The Above example configures a policy that moves the index into the warm
The above example configures a policy that moves the index into the warm
phase after one day. Until then, the index is in a waiting state. After
moving into the warm phase, it will wait until 30 days have elapsed before
moving to the delete phase and deleting the index.
Expand All @@ -76,10 +76,14 @@ check occurs.
=== Phase Execution

The current phase definition, of an index's policy being executed, is stored
in the index's metadata. The phase and its actions are compiled into a series
of discrete steps that are executed sequentially. Since some {ilm-init} actions
are more complex and involve multiple operations against an index, each of these
operations are done in isolation in a unit called a "step". The
in the index's metadata. This phase definition is cached to prevent changes to
the policy from putting the index in a state where it cannot proceed from its
current step. When the policy is updated we check to see if this phase
definition can be safely updated, and if so, update the cached definition in
indices using the updated policy. The phase and its actions are compiled into a
series of discrete steps that are executed sequentially. Since some {ilm-init}
actions are more complex and involve multiple operations against an index, each
of these operations are done in isolation in a unit called a "step". The
<<ilm-explain-lifecycle,Explain Lifecycle API>> exposes this information to us
to see which step our index is either to execute next, or is currently
executing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,41 @@ public void testRetryableInitializationStep() throws Exception {
});
}

public void testRefreshablePhaseJson() throws Exception {
String index = "refresh-index";

createNewSingletonPolicy("hot", new RolloverAction(null, null, 100L));
Request createIndexTemplate = new Request("PUT", "_template/rolling_indexes");
createIndexTemplate.setJsonEntity("{" +
"\"index_patterns\": [\""+ index + "-*\"], \n" +
" \"settings\": {\n" +
" \"number_of_shards\": 1,\n" +
" \"number_of_replicas\": 0,\n" +
" \"index.lifecycle.name\": \"" + policy+ "\",\n" +
" \"index.lifecycle.rollover_alias\": \"alias\"\n" +
" }\n" +
"}");
client().performRequest(createIndexTemplate);

createIndexWithSettings(index + "-1",
Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0),
true);

// Index a document
index(client(), index + "-1", "1", "foo", "bar");

// Wait for the index to enter the check-rollover-ready step
assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(WaitForRolloverReadyStep.NAME)));

// Update the policy to allow rollover at 1 document instead of 100
createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L));

// Index should now have been able to roll over, creating the new index and proceeding to the "complete" step
assertBusy(() -> assertThat(indexExists(index + "-000002"), is(true)));
assertBusy(() -> assertThat(getStepKeyForIndex(index + "-1").getName(), equalTo(TerminalPolicyStep.KEY.getName())));
}

// This method should be called inside an assertBusy, it has no retry logic of its own
private void assertHistoryIsPresent(String policyName, String indexName, boolean success, String stepName) throws IOException {
assertHistoryIsPresent(policyName, indexName, success, null, null, stepName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ private static LifecycleExecutionState updateExecutionStateToStep(LifecyclePolic
/**
* Given a cluster state and lifecycle state, return a new state using the new lifecycle state for the given index.
*/
private static ClusterState.Builder newClusterStateWithLifecycleState(Index index, ClusterState clusterState,
LifecycleExecutionState lifecycleState) {
public static ClusterState.Builder newClusterStateWithLifecycleState(Index index, ClusterState clusterState,
LifecycleExecutionState lifecycleState) {
ClusterState.Builder newClusterStateBuilder = ClusterState.builder(clusterState);
newClusterStateBuilder.metaData(MetaData.builder(clusterState.getMetaData())
.put(IndexMetaData.builder(clusterState.getMetaData().index(index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,55 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ilm.ErrorStep;
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.ilm.LifecycleExecutionState;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.PhaseExecutionInfo;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction;
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Request;
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction.Response;
import org.elasticsearch.xpack.ilm.IndexLifecycleTransition;

import java.io.IOException;
import java.time.Instant;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/**
* This class is responsible for bootstrapping {@link IndexLifecycleMetadata} into the cluster-state, as well
Expand All @@ -45,12 +65,17 @@
public class TransportPutLifecycleAction extends TransportMasterNodeAction<Request, Response> {

private static final Logger logger = LogManager.getLogger(TransportPutLifecycleAction.class);
private final NamedXContentRegistry xContentRegistry;
private final Client client;

@Inject
public TransportPutLifecycleAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
NamedXContentRegistry namedXContentRegistry, Client client) {
super(PutLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new,
indexNameExpressionResolver);
this.xContentRegistry = namedXContentRegistry;
this.client = client;
}

@Override
Expand Down Expand Up @@ -82,7 +107,7 @@ protected Response newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState.Builder newState = ClusterState.builder(currentState);
ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata == null) { // first time using index-lifecycle feature, bootstrap metadata
currentMetadata = IndexLifecycleMetadata.EMPTY;
Expand All @@ -100,13 +125,195 @@ public ClusterState execute(ClusterState currentState) throws Exception {
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
}
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentMetadata.getOperationMode());
newState.metaData(MetaData.builder(currentState.getMetaData())
stateBuilder.metaData(MetaData.builder(currentState.getMetaData())
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata).build());
return newState.build();
ClusterState nonRefreshedState = stateBuilder.build();
if (oldPolicy == null) {
return nonRefreshedState;
} else {
try {
return updateIndicesForPolicy(nonRefreshedState, xContentRegistry, client,
oldPolicy.getPolicy(), lifecyclePolicyMetadata);
} catch (Exception e) {
logger.warn(new ParameterizedMessage("unable to refresh indices phase JSON for updated policy [{}]",
oldPolicy.getName()), e);
// Revert to the non-refreshed state
return nonRefreshedState;
}
}
}
});
}

/**
* Ensure that we have the minimum amount of metadata necessary to check for cache phase
* refresh. This includes:
* - An execution state
* - Existing phase definition JSON
* - A current step key
* - A current phase in the step key
* - Not currently in the ERROR step
*/
static boolean eligibleToCheckForRefresh(final IndexMetaData metaData) {
LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metaData);
if (executionState == null || executionState.getPhaseDefinition() == null) {
return false;
}

Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
if (currentStepKey == null || currentStepKey.getPhase() == null) {
return false;
}

return ErrorStep.NAME.equals(currentStepKey.getName()) == false;
}

/**
* Parse the {@code phaseDef} phase definition to get the stepkeys for the given phase.
* If there is an error parsing or if the phase definition is missing the required
* information, returns null.
*/
@Nullable
static Set<Step.StepKey> readStepKeys(final NamedXContentRegistry xContentRegistry, final Client client,
final String phaseDef, final String currentPhase) {
final PhaseExecutionInfo phaseExecutionInfo;
try (XContentParser parser = JsonXContent.jsonXContent.createParser(xContentRegistry,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION, phaseDef)) {
phaseExecutionInfo = PhaseExecutionInfo.parse(parser, currentPhase);
} catch (Exception e) {
logger.trace(new ParameterizedMessage("exception reading step keys checking for refreshability, phase definition: {}",
phaseDef), e);
return null;
}

if (phaseExecutionInfo == null || phaseExecutionInfo.getPhase() == null) {
return null;
}

return phaseExecutionInfo.getPhase().getActions().values().stream()
.flatMap(a -> a.toSteps(client, phaseExecutionInfo.getPhase().getName(), null).stream())
.map(Step::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));
}

/**
* Returns 'true' if the index's cached phase JSON can be safely reread, 'false' otherwise.
*/
static boolean isIndexPhaseDefinitionUpdatable(final NamedXContentRegistry xContentRegistry, final Client client,
final IndexMetaData metaData, final LifecyclePolicy newPolicy) {
final String index = metaData.getIndex().getName();
if (eligibleToCheckForRefresh(metaData) == false) {
logger.debug("[{}] does not contain enough information to check for eligibility of refreshing phase", index);
return false;
}
final String policyId = newPolicy.getName();

final LifecycleExecutionState executionState = LifecycleExecutionState.fromIndexMetadata(metaData);
final Step.StepKey currentStepKey = LifecycleExecutionState.getCurrentStepKey(executionState);
final String currentPhase = currentStepKey.getPhase();

final Set<Step.StepKey> newStepKeys = newPolicy.toSteps(client).stream()
.map(Step::getKey)
.collect(Collectors.toCollection(LinkedHashSet::new));

if (newStepKeys.contains(currentStepKey) == false) {
// The index is on a step that doesn't exist in the new policy, we
// can't safely re-read the JSON
logger.debug("[{}] updated policy [{}] does not contain the current step key [{}], so the policy phase will not be refreshed",
index, policyId, currentStepKey);
return false;
}

final String phaseDef = executionState.getPhaseDefinition();
final Set<Step.StepKey> oldStepKeys = readStepKeys(xContentRegistry, client, phaseDef, currentPhase);
if (oldStepKeys == null) {
logger.debug("[{}] unable to parse phase definition for cached policy [{}], policy phase will not be refreshed",
index, policyId);
return false;
}

final Set<Step.StepKey> oldPhaseStepKeys = oldStepKeys.stream()
.filter(sk -> currentPhase.equals(sk.getPhase()))
.collect(Collectors.toCollection(LinkedHashSet::new));

final PhaseExecutionInfo phaseExecutionInfo = new PhaseExecutionInfo(policyId, newPolicy.getPhases().get(currentPhase), 1L, 1L);
final String peiJson = Strings.toString(phaseExecutionInfo);

final Set<Step.StepKey> newPhaseStepKeys = readStepKeys(xContentRegistry, client, peiJson, currentPhase);
if (newPhaseStepKeys == null) {
logger.debug(new ParameterizedMessage("[{}] unable to parse phase definition for policy [{}] " +
"to determine if it could be refreshed", index, policyId));
return false;
}

if (newPhaseStepKeys.equals(oldPhaseStepKeys)) {
// The new and old phase have the same stepkeys for this current phase, so we can
// refresh the definition because we know it won't change the execution flow.
logger.debug("[{}] updated policy [{}] contains the same phase step keys and can be refreshed", index, policyId);
return true;
} else {
logger.debug("[{}] updated policy [{}] has different phase step keys and will NOT refresh phase " +
"definition as it differs too greatly. old: {}, new: {}",
index, policyId, oldPhaseStepKeys, newPhaseStepKeys);
return false;
}
}

/**
* Rereads the phase JSON for the given index, returning a new cluster state.
*/
static ClusterState refreshPhaseDefinition(final ClusterState state, final String index, final LifecyclePolicyMetadata updatedPolicy) {
final IndexMetaData idxMeta = state.metaData().index(index);
assert eligibleToCheckForRefresh(idxMeta) : "index " + index + " is missing crucial information needed to refresh phase definition";

logger.trace("[{}] updating cached phase definition for policy [{}]", index, updatedPolicy.getName());
LifecycleExecutionState currentExState = LifecycleExecutionState.fromIndexMetadata(idxMeta);

String currentPhase = currentExState.getPhase();
PhaseExecutionInfo pei = new PhaseExecutionInfo(updatedPolicy.getName(),
updatedPolicy.getPolicy().getPhases().get(currentPhase), updatedPolicy.getVersion(), updatedPolicy.getModifiedDate());

LifecycleExecutionState newExState = LifecycleExecutionState.builder(currentExState)
.setPhaseDefinition(Strings.toString(pei, false, false))
.build();

return IndexLifecycleTransition.newClusterStateWithLifecycleState(idxMeta.getIndex(), state, newExState).build();
}

/**
* For the given new policy, returns a new cluster with all updateable indices' phase JSON refreshed.
*/
static ClusterState updateIndicesForPolicy(final ClusterState state, final NamedXContentRegistry xContentRegistry, final Client client,
final LifecyclePolicy oldPolicy, final LifecyclePolicyMetadata newPolicy) {
assert oldPolicy.getName().equals(newPolicy.getName()) : "expected both policies to have the same id but they were: [" +
oldPolicy.getName() + "] vs. [" + newPolicy.getName() + "]";

// No need to update anything if the policies are identical in contents
if (oldPolicy.equals(newPolicy.getPolicy())) {
logger.debug("policy [{}] is unchanged and no phase definition refresh is needed", oldPolicy.getName());
return state;
}

final List<String> indicesThatCanBeUpdated =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(state.metaData().indices().valuesIt(), 0), false)
.filter(meta -> newPolicy.getName().equals(LifecycleSettings.LIFECYCLE_NAME_SETTING.get(meta.getSettings())))
.filter(meta -> isIndexPhaseDefinitionUpdatable(xContentRegistry, client, meta, newPolicy.getPolicy()))
.map(meta -> meta.getIndex().getName())
.collect(Collectors.toList());

ClusterState updatedState = state;
for (String index : indicesThatCanBeUpdated) {
try {
updatedState = refreshPhaseDefinition(updatedState, index, newPolicy);
} catch (Exception e) {
logger.warn(new ParameterizedMessage("[{}] unable to refresh phase definition for updated policy [{}]",
index, newPolicy.getName()), e);
}
}

return updatedState;
}

@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
Expand Down
Loading

0 comments on commit f53c968

Please sign in to comment.