Skip to content

Commit

Permalink
Add test for the triggered watches index
Browse files Browse the repository at this point in the history
  • Loading branch information
williamrandolph committed Apr 18, 2024
1 parent ccabeb1 commit 74506d3
Showing 1 changed file with 64 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.watcher;

import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.indices.SystemIndexThreadPoolTestCase;
Expand All @@ -15,23 +16,33 @@
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.watcher.execution.Wid;
import org.elasticsearch.xpack.core.watcher.transport.actions.QueryWatchesAction;
import org.elasticsearch.xpack.core.watcher.transport.actions.get.GetWatchRequestBuilder;
import org.elasticsearch.xpack.core.watcher.transport.actions.put.PutWatchRequestBuilder;
import org.elasticsearch.xpack.core.watcher.watch.Watch;
import org.elasticsearch.xpack.ilm.IndexLifecycle;
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatch;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.test.LocalStateWatcher;
import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleTriggerEvent;

import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class WatcherThreadPoolTests extends SystemIndexThreadPoolTestCase {

Expand Down Expand Up @@ -62,15 +73,15 @@ protected Collection<Class<? extends Plugin>> getMockPlugins() {
return plugins;
}

/**
* The main watcher index (.watches) can be tested through the watcher API
*/
public void testWatcherThreadPools() {
runWithBlockedThreadPools(() -> {
{
// write
var response = new PutWatchRequestBuilder(client(), "test-watch").setSource(
watchBuilder().trigger(schedule(interval("3m")))
.input(noneInput())
.condition(InternalAlwaysCondition.INSTANCE)
.addAction("indexer", indexAction("test-index"))
watchBuilder().trigger(schedule(interval("3m"))).input(noneInput()).condition(InternalAlwaysCondition.INSTANCE)
).get();
assertTrue(response.isCreated());
}
Expand All @@ -89,4 +100,52 @@ public void testWatcherThreadPools() {
}
});
}

/**
* This test uses an instance of TriggeredWatchStore directly because the public
* API doesn't seem to return much information that indicates changes in the
* underlying index.
*/
public void testTriggeredWatchesIndex() {
internalCluster().getInstances(TriggerService.class).forEach(TriggerService::pauseExecution);

TriggeredWatchStore watchStore = internalCluster().getInstance(TriggeredWatchStore.class);

List<Watch> fakeWatches = getFakeWatches(List.of("fake-patek-phillipe", "fake-tag-heuer"));
List<Wid> wids = fakeWatches.stream().map(w -> new Wid(w.id(), ZonedDateTime.now())).toList();

runWithBlockedThreadPools(() -> {
try {
BulkResponse bulkResponse = watchStore.putAll(
wids.stream()
.map(w -> new TriggeredWatch(w, new ScheduleTriggerEvent(ZonedDateTime.now(), ZonedDateTime.now())))
.toList()
);
assertFalse(bulkResponse.hasFailures());
assertBusy(() -> {
Collection<TriggeredWatch> triggeredWatches1 = watchStore.findTriggeredWatches(fakeWatches, clusterService().state());
assertThat(triggeredWatches1, hasSize(2));
});

wids.forEach(watchStore::delete);

assertBusy(() -> {
Collection<TriggeredWatch> triggeredWatches2 = watchStore.findTriggeredWatches(fakeWatches, clusterService().state());
assertThat(triggeredWatches2, hasSize(0));
});
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
internalCluster().getInstances(TriggerService.class).forEach(triggerService -> triggerService.start(List.of()));
}
});
}

private static List<Watch> getFakeWatches(List<String> watchNames) {
return watchNames.stream().map(n -> {
Watch fakeWatch = mock(Watch.class);
when(fakeWatch.id()).thenReturn(n);
return fakeWatch;
}).toList();
}
}

0 comments on commit 74506d3

Please sign in to comment.