From f8c9f4c2b402c1da11fcfadc336ab537b654427c Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 23 Apr 2020 14:31:13 +0200 Subject: [PATCH] improve tests related to stopping using a client that answers and can be synchronized with the test thread in order to test special situations --- .../xpack/rollup/job/RollupJobTaskTests.java | 320 ++++++++++-------- 1 file changed, 181 insertions(+), 139 deletions(-) diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 0332ceb950dc4..6ff7834c9956c 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -6,6 +6,9 @@ package org.elasticsearch.xpack.rollup.job; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.Client; @@ -19,6 +22,7 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; @@ -197,83 +201,95 @@ public void onFailure(Exception e) { public void testStartWhenStopping() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); - Client client = mock(Client.class); - when(client.settings()).thenReturn(Settings.EMPTY); - when(client.threadPool()).thenReturn(pool); - SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - AtomicInteger counter = new AtomicInteger(0); - TaskId taskId = new TaskId("node", 123); - RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { - @Override - public void updatePersistentTaskState(PersistentTaskState taskState, - ActionListener> listener) { - assertThat(taskState, instanceOf(RollupJobStatus.class)); - int c = counter.get(); - if (c == 0) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); - } else if (c == 1) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); - } else { - fail("Should not have updated persistent statuses > 2 times"); + final CountDownLatch block = new CountDownLatch(1); + final CountDownLatch unblock = new CountDownLatch(1); + try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) { + SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); + + AtomicInteger counter = new AtomicInteger(0); + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, + null, client, schedulerEngine, pool, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + int c = counter.get(); + if (c == 0) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); + } else if (c == 1) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else if (c == 2) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else { + fail("Should not have updated persistent statuses > 3 times"); + } + listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, + new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); + counter.incrementAndGet(); + } + }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); + assertNull(((RollupJobStatus)task.getStatus()).getPosition()); + + CountDownLatch latch = new CountDownLatch(1); + task.start(new ActionListener() { + @Override + public void onResponse(StartRollupJobAction.Response response) { + assertTrue(response.isStarted()); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); + latch.countDown(); } - listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); - counter.incrementAndGet(); - } - }; - task.init(null, mock(TaskManager.class), taskId.toString(), 123); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); - assertNull(((RollupJobStatus)task.getStatus()).getPosition()); - - CountDownLatch latch = new CountDownLatch(1); - task.start(new ActionListener() { - @Override - public void onResponse(StartRollupJobAction.Response response) { - assertTrue(response.isStarted()); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); - latch.countDown(); - } - @Override - public void onFailure(Exception e) { - fail("Should not have entered onFailure"); - } - }); - latch.await(3, TimeUnit.SECONDS); + @Override + public void onFailure(Exception e) { + fail("Should not have entered onFailure"); + } + }); + latch.await(3, TimeUnit.SECONDS); + + task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); + assertThat(task.getStats().getNumInvocations(), equalTo(1L)); + + // wait until the search request is send, this is unblocked in the client + block.await(3, TimeUnit.SECONDS); + task.stop(new ActionListener() { + @Override + public void onResponse(StopRollupJobAction.Response response) { + assertTrue(response.isStopped()); + } - task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); - assertThat(task.getStats().getNumInvocations(), equalTo(1L)); + @Override + public void onFailure(Exception e) { + fail("should not have entered onFailure"); + } + }); - task.stop(new ActionListener() { - @Override - public void onResponse(StopRollupJobAction.Response response) { - assertTrue(response.isStopped()); - } + // we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state + assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING)); - @Override - public void onFailure(Exception e) { - fail("should not have entered onFailure"); - } - }); + CountDownLatch latch2 = new CountDownLatch(1); + task.start(new ActionListener() { + @Override + public void onResponse(StartRollupJobAction.Response response) { + fail("should not have entered onResponse"); + } - CountDownLatch latch2 = new CountDownLatch(1); - task.start(new ActionListener() { - @Override - public void onResponse(StartRollupJobAction.Response response) { - fail("should not have entered onResponse"); - } + @Override + public void onFailure(Exception e) { + assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job [" + + job.getConfig().getId() + "] because state was [STOPPING]")); + latch2.countDown(); + } + }); + latch2.await(3, TimeUnit.SECONDS); - @Override - public void onFailure(Exception e) { - assertThat(e.getMessage(), equalTo("Cannot start task for Rollup Job [" - + job.getConfig().getId() + "] because state was [STOPPING]")); - latch2.countDown(); - } - }); - latch2.await(3, TimeUnit.SECONDS); + // the the client answer + unblock.countDown(); + } } public void testStartWhenStopped() throws InterruptedException { @@ -685,85 +701,94 @@ public void onFailure(Exception e) { public void testStopWhenStopping() throws InterruptedException { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); - Client client = mock(Client.class); - when(client.settings()).thenReturn(Settings.EMPTY); - when(client.threadPool()).thenReturn(pool); - SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); - - AtomicInteger counter = new AtomicInteger(0); - TaskId taskId = new TaskId("node", 123); - RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, - null, client, schedulerEngine, pool, Collections.emptyMap()) { - @Override - public void updatePersistentTaskState(PersistentTaskState taskState, - ActionListener> listener) { - assertThat(taskState, instanceOf(RollupJobStatus.class)); - int c = counter.get(); - if (c == 0) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); - } else if (c == 1) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); - } else if (c == 2) { - assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); - } else { - fail("Should not have updated persistent statuses > 3 times"); + final CountDownLatch block = new CountDownLatch(1); + final CountDownLatch unblock = new CountDownLatch(1); + try (NoOpClient client = getEmptySearchResponseClient(block, unblock)) { + SchedulerEngine schedulerEngine = mock(SchedulerEngine.class); + + AtomicInteger counter = new AtomicInteger(0); + TaskId taskId = new TaskId("node", 123); + RollupJobTask task = new RollupJobTask(1, "type", "action", taskId, job, + null, client, schedulerEngine, pool, Collections.emptyMap()) { + @Override + public void updatePersistentTaskState(PersistentTaskState taskState, + ActionListener> listener) { + assertThat(taskState, instanceOf(RollupJobStatus.class)); + int c = counter.get(); + if (c == 0) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STARTED)); + } else if (c == 1) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else if (c == 2) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else if (c == 3) { + assertThat(((RollupJobStatus) taskState).getIndexerState(), equalTo(IndexerState.STOPPED)); + } else { + fail("Should not have updated persistent statuses > 4 times"); + } + listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, + new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); + counter.incrementAndGet(); + } + }; + task.init(null, mock(TaskManager.class), taskId.toString(), 123); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); + assertNull(((RollupJobStatus)task.getStatus()).getPosition()); + + CountDownLatch latch = new CountDownLatch(1); + task.start(new ActionListener() { + @Override + public void onResponse(StartRollupJobAction.Response response) { + assertTrue(response.isStarted()); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); + latch.countDown(); } - listener.onResponse(new PersistentTasksCustomMetadata.PersistentTask<>("foo", RollupField.TASK_NAME, job, 1, - new PersistentTasksCustomMetadata.Assignment("foo", "foo"))); - counter.incrementAndGet(); - } - }; - task.init(null, mock(TaskManager.class), taskId.toString(), 123); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); - assertNull(((RollupJobStatus)task.getStatus()).getPosition()); + @Override + public void onFailure(Exception e) { + fail("Should not have entered onFailure"); + } + }); + latch.await(3, TimeUnit.SECONDS); - CountDownLatch latch = new CountDownLatch(1); - task.start(new ActionListener() { - @Override - public void onResponse(StartRollupJobAction.Response response) { - assertTrue(response.isStarted()); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); - latch.countDown(); - } + task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); + assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); + assertThat(task.getStats().getNumInvocations(), equalTo(1L)); - @Override - public void onFailure(Exception e) { - fail("Should not have entered onFailure"); - } - }); - latch.await(3, TimeUnit.SECONDS); + // wait until the search request is send, this is unblocked in the client + block.await(3, TimeUnit.SECONDS); - task.triggered(new SchedulerEngine.Event(RollupJobTask.SCHEDULE_NAME + "_" + job.getConfig().getId(), 123, 123)); - assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.INDEXING)); - assertThat(task.getStats().getNumInvocations(), equalTo(1L)); + task.stop(new ActionListener() { + @Override + public void onResponse(StopRollupJobAction.Response response) { + assertTrue(response.isStopped()); + } - task.stop(new ActionListener() { - @Override - public void onResponse(StopRollupJobAction.Response response) { - assertTrue(response.isStopped()); - } + @Override + public void onFailure(Exception e) { + fail("should not have entered onFailure"); + } + }); - @Override - public void onFailure(Exception e) { - fail("should not have entered onFailure"); - } - }); + // we issued stop but the indexer is waiting for the search response, therefore we should be in stopping state + assertThat(((RollupJobStatus) task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPING)); - CountDownLatch latch2 = new CountDownLatch(1); - task.stop(new ActionListener() { - @Override - public void onResponse(StopRollupJobAction.Response response) { - assertTrue(response.isStopped()); - latch2.countDown(); - } + CountDownLatch latch2 = new CountDownLatch(1); + task.stop(new ActionListener() { + @Override + public void onResponse(StopRollupJobAction.Response response) { + assertTrue(response.isStopped()); + latch2.countDown(); + } - @Override - public void onFailure(Exception e) { - fail("Should not have entered onFailure"); - } - }); - latch2.await(3, TimeUnit.SECONDS); + @Override + public void onFailure(Exception e) { + fail("Should not have entered onFailure"); + } + }); + latch2.await(3, TimeUnit.SECONDS); + unblock.countDown(); + } } public void testStopWhenAborting() throws InterruptedException { @@ -807,4 +832,21 @@ public void onFailure(Exception e) { }); latch.await(3, TimeUnit.SECONDS); } + + private NoOpClient getEmptySearchResponseClient(CountDownLatch unblock, CountDownLatch block) { + return new NoOpClient(getTestName()) { + @SuppressWarnings("unchecked") + @Override + protected + void doExecute(ActionType action, Request request, ActionListener listener) { + try { + unblock.countDown(); + block.await(3, TimeUnit.SECONDS); + } catch (InterruptedException e) { + fail("Should not have timed out"); + } + listener.onResponse((Response) mock(SearchResponse.class)); + } + }; + } }