Skip to content

Commit

Permalink
Watcher fix ActivateWatchTests (elastic#82157)
Browse files Browse the repository at this point in the history
This commit attempts to fix a set of Watcher tests that can
fail due to unexpected execution of Watches after Watcher has been stopped.

The theory here is that a Watch can be queued but not fully executed
then Watcher is shutdown, the test does some clean up, then the
queued Watch finishes execution and causes some some additional cleanup
to fail.

The change here ensures that when Watcher is stopped from AbstractWatcherIntegrationTestCase
that it will also wait until there are no more current Watches executing.

closes elastic#66495
  • Loading branch information
jakelandis authored Jan 4, 2022
1 parent 550eb32 commit 912cf94
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ protected void ensureLicenseEnabled() throws Exception {

protected void stopWatcher() throws Exception {
assertBusy(() -> {
WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).get();

WatcherStatsResponse watcherStatsResponse = new WatcherStatsRequestBuilder(client()).setIncludeCurrentWatches(true).get();
assertThat(watcherStatsResponse.hasFailures(), is(false));
List<Tuple<String, WatcherState>> currentStatesFromStatsRequest = watcherStatsResponse.getNodes()
.stream()
Expand All @@ -580,7 +581,8 @@ protected void stopWatcher() throws Exception {
.collect(Collectors.toList());
List<WatcherState> states = currentStatesFromStatsRequest.stream().map(Tuple::v2).collect(Collectors.toList());

logger.info("waiting to stop watcher, current states {}", currentStatesFromStatsRequest);
long currentWatches = watcherStatsResponse.getNodes().stream().mapToLong(n -> n.getSnapshots().size()).sum();
logger.info("waiting to stop watcher, current states {}, current watches [{}]", currentStatesFromStatsRequest, currentWatches);

boolean isAllStateStarted = states.stream().allMatch(w -> w == WatcherState.STARTED);
if (isAllStateStarted) {
Expand All @@ -594,7 +596,7 @@ protected void stopWatcher() throws Exception {
}

boolean isAllStateStopped = states.stream().allMatch(w -> w == WatcherState.STOPPED);
if (isAllStateStopped) {
if (isAllStateStopped && currentWatches == 0) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ protected boolean timeWarped() {
return false;
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66495")
public void testDeactivateAndActivate() throws Exception {
PutWatchResponse putWatchResponse = new PutWatchRequestBuilder(client()).setId("_id")
.setSource(
Expand Down Expand Up @@ -107,7 +106,6 @@ public void testDeactivateAndActivate() throws Exception {
});
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/66495")
public void testLoadWatchWithoutAState() throws Exception {
PutWatchResponse putWatchResponse = new PutWatchRequestBuilder(client()).setId("_id")
.setSource(
Expand Down

0 comments on commit 912cf94

Please sign in to comment.