Skip to content

Commit

Permalink
Replace failing PolicyStepsRegistry sanity check assert with a test (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
joegallo authored Mar 28, 2022
1 parent bd6b27c commit ff517c5
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,6 @@ public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKe
final Step s = phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
if (s != null) {
cachedSteps.put(indexMetadata.getIndex(), Tuple.tuple(indexMetadata, s));
// assert that the cache works as expected -- that is, if we put something into the cache,
// we should get back the same thing if we were to invoke getStep again with the same arguments
assert s == getCachedStep(indexMetadata, stepKey) : "policy step registry cache failed sanity check";
}
return s;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.metadata.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -465,4 +467,76 @@ public void testUpdatePolicyButNoPhaseChangeIndexStepsDontChange() throws Except
assertThat(((ShrinkStep) shrinkStep).getNumberOfShards(), equalTo(2));
assertThat(((ShrinkStep) gotStep).getNumberOfShards(), equalTo(1));
}

public void testGetStepMultithreaded() throws Exception {
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);

LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases("policy");
String phaseName = randomFrom(policy.getPhases().keySet());
Phase phase = policy.getPhases().get(phaseName);

LifecycleExecutionState lifecycleState = LifecycleExecutionState.builder()
.setPhaseDefinition(Strings.toString(new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong())))
.build();
IndexMetadata indexMetadata = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_NAME, "policy")
.build()
)
.putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.asMap())
.build();

SortedMap<String, LifecyclePolicyMetadata> metas = new TreeMap<>();
metas.put("policy", new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong()));
IndexLifecycleMetadata meta = new IndexLifecycleMetadata(metas, OperationMode.RUNNING);

PolicyStepsRegistry registry = new PolicyStepsRegistry(REGISTRY, client, null);
registry.update(meta);

// test a variety of getStep calls with random actions and steps
for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) {
LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values()));
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null));
// if the step's key is different from the previous iteration of the loop, then the cache will be updated, and we'll
// get a non-cached response. if the step's key happens to be the same as the previous iteration of the loop, then
// we'll get a cached response. so this loop randomly tests both cached and non-cached responses.
Step actualStep = registry.getStep(indexMetadata, step.getKey());
assertThat(actualStep.getKey(), equalTo(step.getKey()));
}

final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean done = new AtomicBoolean(false);

// now, in another thread, update the registry repeatedly as fast as possible.
// updating the registry has the side effect of clearing the cache.
Thread t = new Thread(() -> {
latch.countDown(); // signal that we're starting
while (done.get() == false) {
registry.update(meta);
}
});
t.start();

try {
latch.await(); // wait until the other thread started

// and, while the cache is being repeatedly cleared,
// test a variety of getStep calls with random actions and steps
for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) {
LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values()));
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null));
Step actualStep = registry.getStep(indexMetadata, step.getKey());
assertThat(actualStep.getKey(), equalTo(step.getKey()));
}
} finally {
// tell the other thread we're finished and wait for it to die
done.set(true);
t.join(1000);
}
}
}

0 comments on commit ff517c5

Please sign in to comment.