Skip to content

Commit

Permalink
Add a step to wait for the replicate_for time to pass
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo committed Dec 16, 2024
1 parent 0685f9f commit 85ad2c5
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.core.ilm.step.info.EmptyInfo;
import org.elasticsearch.xpack.core.ilm.step.info.SingleMessageFieldInfo;

import java.time.Instant;
import java.util.Objects;
import java.util.function.Supplier;

/**
* This {@link Step} waits until the `replicate_for` time of a searchable_snapshot action to pass.
* <p>
* It's an {@link AsyncWaitStep} rather than a {@link ClusterStateWaitStep} because we aren't guaranteed to
* receive a new cluster state in timely fashion when the waiting finishes -- by extending {@link AsyncWaitStep}
* we are guaranteed to check the condition on each ILM execution.
*/
public class WaitUntilReplicateForTimePassesStep extends AsyncWaitStep {

public static final String NAME = "check-replicate-for-time-passed";

private static final Logger logger = LogManager.getLogger(WaitUntilReplicateForTimePassesStep.class);

private final TimeValue replicateFor;
private final Supplier<Instant> nowSupplier;

WaitUntilReplicateForTimePassesStep(StepKey key, StepKey nextStepKey, TimeValue replicateFor, Supplier<Instant> nowSupplier) {
super(key, nextStepKey, null);
this.replicateFor = replicateFor;
this.nowSupplier = nowSupplier;
}

WaitUntilReplicateForTimePassesStep(StepKey key, StepKey nextStepKey, TimeValue replicateFor) {
this(key, nextStepKey, replicateFor, Instant::now);
}

public TimeValue getReplicateFor() {
return this.replicateFor;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), this.replicateFor);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
}
WaitUntilReplicateForTimePassesStep other = (WaitUntilReplicateForTimePassesStep) obj;
return super.equals(obj) && Objects.equals(this.replicateFor, other.replicateFor);
}

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
IndexMetadata indexMetadata = metadata.index(index);
assert indexMetadata != null
: "the index metadata for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";

final LifecycleExecutionState executionState = metadata.index(index.getName()).getLifecycleExecutionState();
assert executionState != null
: "the lifecycle execution state for index [" + index.getName() + "] must exist in the cluster state for step [" + NAME + "]";

final Instant endTime = Instant.ofEpochMilli(executionState.phaseTime() + this.replicateFor.millis());
if (nowSupplier.get().isBefore(endTime)) {
listener.onResponse(
false,
new SingleMessageFieldInfo(
Strings.format(
"Waiting until the replicate_for time [%s] has elapsed for index [%s] before removing replicas.",
this.replicateFor,
index.getName()
)
)
);
return;
}

listener.onResponse(true, EmptyInfo.INSTANCE);
}

@Override
public boolean isRetryable() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xpack.core.ilm.step.info.EmptyInfo;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static org.elasticsearch.cluster.metadata.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
import static org.hamcrest.Matchers.is;

public class WaitUntilReplicateForTimePassesStepTests extends AbstractStepTestCase<WaitUntilReplicateForTimePassesStep> {

@Override
protected WaitUntilReplicateForTimePassesStep createRandomInstance() {
Step.StepKey stepKey = randomStepKey();
Step.StepKey nextStepKey = randomStepKey();
TimeValue replicateFor = randomPositiveTimeValue();
return new WaitUntilReplicateForTimePassesStep(stepKey, nextStepKey, replicateFor, Instant::now);
}

@Override
protected WaitUntilReplicateForTimePassesStep mutateInstance(WaitUntilReplicateForTimePassesStep instance) {
Step.StepKey key = instance.getKey();
Step.StepKey nextKey = instance.getNextStepKey();
TimeValue replicateFor = randomTimeValue();

switch (between(0, 2)) {
case 0 -> key = new Step.StepKey(key.phase(), key.action(), key.name() + randomAlphaOfLength(5));
case 1 -> nextKey = new Step.StepKey(nextKey.phase(), nextKey.action(), nextKey.name() + randomAlphaOfLength(5));
case 2 -> replicateFor = randomValueOtherThan(replicateFor, ESTestCase::randomPositiveTimeValue);
}
return new WaitUntilReplicateForTimePassesStep(key, nextKey, replicateFor, Instant::now);
}

@Override
protected WaitUntilReplicateForTimePassesStep copyInstance(WaitUntilReplicateForTimePassesStep instance) {
return new WaitUntilReplicateForTimePassesStep(
instance.getKey(),
instance.getNextStepKey(),
instance.getReplicateFor(),
Instant::now
);
}

public void testEvaluateCondition() {
// a mutable box that we can put Instants into
final Instant[] returnVal = new Instant[1];

final WaitUntilReplicateForTimePassesStep step = new WaitUntilReplicateForTimePassesStep(
randomStepKey(),
randomStepKey(),
TimeValue.timeValueHours(1),
() -> returnVal[0]
);

final Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
final Instant t1 = now.minus(2, ChronoUnit.HOURS);
final Instant t2 = now.plus(2, ChronoUnit.HOURS);

final IndexMetadata indexMeta = getIndexMetadata(randomAlphaOfLengthBetween(10, 30), randomAlphaOfLengthBetween(10, 30), step);
final Metadata metadata = Metadata.builder().put(indexMeta, true).build();
final Index index = indexMeta.getIndex();

// if we evaluate the condition now, it hasn't been met, because it hasn't been an hour
returnVal[0] = now;
step.evaluateCondition(metadata, index, new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean complete, ToXContentObject informationContext) {
assertThat(complete, is(false));
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
}, MASTER_TIMEOUT);

returnVal[0] = t1; // similarly, if we were in the past, enough time also wouldn't have passed
step.evaluateCondition(metadata, index, new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean complete, ToXContentObject informationContext) {
assertThat(complete, is(false));
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
}, MASTER_TIMEOUT);

returnVal[0] = t2; // but two hours from now in the future, an hour will have passed
step.evaluateCondition(metadata, index, new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean complete, ToXContentObject informationContext) {
assertThat(complete, is(true));
assertThat(informationContext, is(EmptyInfo.INSTANCE));
}

@Override
public void onFailure(Exception e) {
throw new AssertionError("Unexpected method call", e);
}
}, MASTER_TIMEOUT);
}

private IndexMetadata getIndexMetadata(String index, String lifecycleName, WaitUntilReplicateForTimePassesStep step) {
IndexMetadata idxMetadata = IndexMetadata.builder(index)
.settings(settings(IndexVersion.current()).put(LifecycleSettings.LIFECYCLE_NAME, lifecycleName))
.numberOfShards(randomIntBetween(1, 5))
.numberOfReplicas(randomIntBetween(0, 5))
.build();

LifecycleExecutionState.Builder lifecycleState = LifecycleExecutionState.builder();
lifecycleState.setPhase(step.getKey().phase());
lifecycleState.setAction(step.getKey().action());
lifecycleState.setStep(step.getKey().name());
long stateTimes = System.currentTimeMillis();
lifecycleState.setPhaseTime(stateTimes);
lifecycleState.setActionTime(stateTimes);
lifecycleState.setStepTime(stateTimes);
lifecycleState.setIndexCreationDate(randomNonNegativeLong());
return IndexMetadata.builder(idxMetadata).putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.build().asMap()).build();
}
}

0 comments on commit 85ad2c5

Please sign in to comment.