Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] improve stopping tests #55666

Merged
merged 1 commit into from
Apr 24, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
package org.elasticsearch.xpack.rollup.job;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
Expand All @@ -19,6 +22,7 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.IndexerState;
Expand Down Expand Up @@ -197,83 +201,95 @@ public void onFailure(Exception e) {

public void testStartWhenStopping() throws InterruptedException {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(pool);
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);

AtomicInteger counter = new AtomicInteger(0);
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuses > 2 times");
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch unblock = new CountDownLatch(1);
try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);

AtomicInteger counter = new AtomicInteger(0);
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuses > 3 times");
}
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet();
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet();
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}

@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch.await(3, TimeUnit.SECONDS);
@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch.await(3, TimeUnit.SECONDS);

task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));

// wait until the search request is send, this is unblocked in the client
block.await(3, TimeUnit.SECONDS);
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
}

task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
@Override
public void onFailure(Exception e) {
fail("should not have entered onFailure");
}
});

task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
}
// we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state
assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING));

@Override
public void onFailure(Exception e) {
fail("should not have entered onFailure");
}
});
CountDownLatch latch2 = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
fail("should not have entered onResponse");
}

CountDownLatch latch2 = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
fail("should not have entered onResponse");
}
@Override
public void onFailure(Exception e) {
assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job ["
+ job.getConfig().getId() + "] because state was [STOPPING]"));
latch2.countDown();
}
});
latch2.await(3, TimeUnit.SECONDS);

@Override
public void onFailure(Exception e) {
assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job ["
+ job.getConfig().getId() + "] because state was [STOPPING]"));
latch2.countDown();
}
});
latch2.await(3, TimeUnit.SECONDS);
// the the client answer
unblock.countDown();
}
}

public void testStartWhenStopped() throws InterruptedException {
Expand Down Expand Up @@ -685,85 +701,94 @@ public void onFailure(Exception e) {

public void testStopWhenStopping() throws InterruptedException {
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
Client client = mock(Client.class);
when(client.settings()).thenReturn(Settings.EMPTY);
when(client.threadPool()).thenReturn(pool);
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);

AtomicInteger counter = new AtomicInteger(0);
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuses > 3 times");
final CountDownLatch block = new CountDownLatch(1);
final CountDownLatch unblock = new CountDownLatch(1);
try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) {
SchedulerEngine schedulerEngine = mock(SchedulerEngine.class);

AtomicInteger counter = new AtomicInteger(0);
TaskId taskId = new TaskId("node", 123);
RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job,
null, client, schedulerEngine, pool, Collections.emptyMap()) {
@Override
public void updatePersistentTaskState(PersistentTaskState taskState,
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
assertThat(taskState, instanceOf(RollupJobStatus.class));
int c = counter.get();
if (c == 0) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED));
} else if (c == 1) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 2) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else if (c == 3) {
assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED));
} else {
fail("Should not have updated persistent statuses > 4 times");
}
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet();
}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1,
new PersistentTasksCustomMetadata.Assignment("foo", "foo")));
counter.incrementAndGet();

}
};
task.init(null, mock(TaskManager.class), taskId.toString(), 123);
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
assertNull(((RollupJobStatus)task.getStatus()).getPosition());
@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch.await(3, TimeUnit.SECONDS);

CountDownLatch latch = new CountDownLatch(1);
task.start(new ActionListener<StartRollupJobAction.Response>() {
@Override
public void onResponse(StartRollupJobAction.Response response) {
assertTrue(response.isStarted());
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
latch.countDown();
}
task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));

@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch.await(3, TimeUnit.SECONDS);
// wait until the search request is send, this is unblocked in the client
block.await(3, TimeUnit.SECONDS);

task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123));
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING));
assertThat(task.getStats().getNumInvocations(), equalTo(1L));
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
}

task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
}
@Override
public void onFailure(Exception e) {
fail("should not have entered onFailure");
}
});

@Override
public void onFailure(Exception e) {
fail("should not have entered onFailure");
}
});
// we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state
assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING));

CountDownLatch latch2 = new CountDownLatch(1);
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
latch2.countDown();
}
CountDownLatch latch2 = new CountDownLatch(1);
task.stop(new ActionListener<StopRollupJobAction.Response>() {
@Override
public void onResponse(StopRollupJobAction.Response response) {
assertTrue(response.isStopped());
latch2.countDown();
}

@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch2.await(3, TimeUnit.SECONDS);
@Override
public void onFailure(Exception e) {
fail("Should not have entered onFailure");
}
});
latch2.await(3, TimeUnit.SECONDS);
unblock.countDown();
}
}

public void testStopWhenAborting() throws InterruptedException {
Expand Down Expand Up @@ -807,4 +832,21 @@ public void onFailure(Exception e) {
});
latch.await(3, TimeUnit.SECONDS);
}

private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) {
return new NoOpClient(getTestName()) {
@SuppressWarnings("unchecked")
@Override
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
try {
unblock.countDown();
block.await(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
fail("Should not have timed out");
}
listener.onResponse((Response) mock(SearchResponse.class));
}
};
}
}