Skip to content

Commit

Permalink
[Rollup] improve stopping tests (#55666)
Browse files Browse the repository at this point in the history
improve tests related to stopping using a client that answers and can be
synchronized with the test thread in order to test special situations

relates #55011
  • Loading branch information
Hendrik Muhs authored Apr 24, 2020
1 parent 87538e4 commit dca44f6
Showing 1 changed file with 181 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.rollup.job;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
Expand All @@ -19,6 +22,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.IndexerState;
Expand Down Expand Up @@ -197,83 +201,95 @@ public void onFailure(Exception e) {

public void testStartWhenStopping() throws InterruptedException {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(pool);
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);

AtomicInteger counter = new AtomicInteger(0);
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuses > 2 times");
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch unblock = new CountDownLatch(1);
try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);

AtomicInteger counter = new AtomicInteger(0);
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuses > 3 times");
}
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet();
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet();
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch.await(3, TimeUnit.SECONDS);
@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch.await(3, TimeUnit.SECONDS);

task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));

// wait until the search request is send, this is unblocked in the client
block.await(3, TimeUnit.SECONDS);
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
}

task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
@Override
public void onFailure(Exception e) {
fail("should not have entered onFailure");
}
});

task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
}
// we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state
assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING));

@Override
public void onFailure(Exception e) {
fail("should not have entered onFailure");
}
});
CountDownLatch latch2 = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
fail("should not have entered onResponse");
}

CountDownLatch latch2 = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
fail("should not have entered onResponse");
}
@Override
public void onFailure(Exception e) {
assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job ["
+ job.getConfig().getId() + "] because state was [STOPPING]"));
latch2.countDown();
}
});
latch2.await(3, TimeUnit.SECONDS);

@Override
public void onFailure(Exception e) {
assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job ["
+ job.getConfig().getId() + "] because state was [STOPPING]"));
latch2.countDown();
}
});
latch2.await(3, TimeUnit.SECONDS);
// the the client answer
unblock.countDown();
}
}

public void testStartWhenStopped() throws InterruptedException {
Expand Down Expand Up @@ -685,85 +701,94 @@ public void onFailure(Exception e) {

public void testStopWhenStopping() throws InterruptedException {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(pool);
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);

AtomicInteger counter = new AtomicInteger(0);
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuses > 3 times");
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch unblock = new CountDownLatch(1);
try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);

AtomicInteger counter = new AtomicInteger(0);
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 3) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuses > 4 times");
}
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet();
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet();

}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch.await(3, TimeUnit.SECONDS);

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));

@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch.await(3, TimeUnit.SECONDS);
// wait until the search request is send, this is unblocked in the client
block.await(3, TimeUnit.SECONDS);

task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
}

task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
}
@Override
public void onFailure(Exception e) {
fail("should not have entered onFailure");
}
});

@Override
public void onFailure(Exception e) {
fail("should not have entered onFailure");
}
});
// we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state
assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING));

CountDownLatch latch2 = new CountDownLatch(1);
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
latch2.countDown();
}
CountDownLatch latch2 = new CountDownLatch(1);
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
latch2.countDown();
}

@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch2.await(3, TimeUnit.SECONDS);
@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch2.await(3, TimeUnit.SECONDS);
unblock.countDown();
}
}

public void testStopWhenAborting() throws InterruptedException {
Expand Down Expand Up @@ -807,4 +832,21 @@ public void onFailure(Exception e) {
});
latch.await(3, TimeUnit.SECONDS);
}

private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) {
return new NoOpClient(getTestName()) {
@SuppressWarnings("unchecked")
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
try {
unblock.countDown();
block.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("Should not have timed out");
}
listener.onResponse((Response) mock(SearchResponse.class));
}
};
}
}

0 comments on commit dca44f6

Please sign in to comment.