Skip to content

Commit

Permalink
add more unit tests for AD transport actions (#498)
Browse files Browse the repository at this point in the history
Signed-off-by: Xun Zhang <[email protected]>
  • Loading branch information
Zhangxunmt authored Apr 15, 2022
1 parent 52bb9c8 commit 9a25a4d
Show file tree
Hide file tree
Showing 8 changed files with 796 additions and 7 deletions.
8 changes: 1 addition & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -546,17 +546,11 @@ List<String> jacocoExclusions = [
//'org.opensearch.ad.common.exception.AnomalyDetectionException',
'org.opensearch.ad.util.ClientUtil',

'org.opensearch.ad.transport.StopDetectorRequest',
'org.opensearch.ad.transport.StopDetectorResponse',
'org.opensearch.ad.transport.CronRequest',
'org.opensearch.ad.AnomalyDetectorRunner',

// related to transport actions added for security
'org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction*',
'org.opensearch.ad.transport.GetAnomalyDetectorTransportAction*',
'org.opensearch.ad.transport.SearchAnomalyResultTransportAction*',
'org.opensearch.ad.transport.SearchAnomalyDetectorInfoTransportAction*',
'org.opensearch.ad.transport.AnomalyDetectorJobRequest',
'org.opensearch.ad.transport.DeleteAnomalyDetectorTransportAction.1',

// TODO: unified flow caused coverage drop
'org.opensearch.ad.transport.DeleteAnomalyResultsTransportAction',
Expand Down
24 changes: 24 additions & 0 deletions src/test/java/org/opensearch/ad/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package org.opensearch.ad;

import static org.apache.http.entity.ContentType.APPLICATION_JSON;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
Expand Down Expand Up @@ -108,6 +109,7 @@
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -1495,4 +1497,26 @@ public static DetectorValidationIssue randomDetectorValidationIssueWithDetectorI
);
return issue;
}

public static ClusterState createClusterState() {
ImmutableOpenMap<String, IndexMetadata> immutableOpenMap = ImmutableOpenMap
.<String, IndexMetadata>builder()
.fPut(
ANOMALY_DETECTOR_JOB_INDEX,
IndexMetadata
.builder("test")
.settings(
Settings
.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put("index.version.created", Version.CURRENT.id)
)
.build()
)
.build();
Metadata metaData = Metadata.builder().indices(immutableOpenMap).build();
ClusterState clusterState = new ClusterState(new ClusterName("test_name"), 1l, "uuid", metaData, null, null, null, null, 1, true);
return clusterState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -25,6 +26,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.DetectionDateRange;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -112,6 +114,18 @@ public void testAdJobAction() {

@Test
public void testAdJobRequest() throws IOException {
DetectionDateRange detectionDateRange = new DetectionDateRange(Instant.MIN, Instant.now());
request = new AnomalyDetectorJobRequest("1234", detectionDateRange, false, 4567, 7890, "_start");

BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out);
StreamInput input = out.bytes().streamInput();
AnomalyDetectorJobRequest newRequest = new AnomalyDetectorJobRequest(input);
Assert.assertEquals(request.getDetectorID(), newRequest.getDetectorID());
}

@Test
public void testAdJobRequest_NullDetectionDateRange() throws IOException {
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out);
StreamInput input = out.bytes().streamInput();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.ad.transport;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.function.Consumer;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.ad.AbstractADTest;
import org.opensearch.ad.TestHelpers;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.IntervalTimeConfiguration;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.index.get.GetResult;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.tasks.Task;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;

public class DeleteAnomalyDetectorTests extends AbstractADTest {
private DeleteAnomalyDetectorTransportAction action;
private TransportService transportService;
private ActionFilters actionFilters;
private Client client;
private ADTaskManager adTaskManager;
private PlainActionFuture<DeleteResponse> future;
private DeleteResponse deleteResponse;
private GetResponse getResponse;
ClusterService clusterService;
private AnomalyDetectorJob jobParameter;

@BeforeClass
public static void setUpBeforeClass() {
setUpThreadPool(EntityProfileTests.class.getSimpleName());
}

@AfterClass
public static void tearDownAfterClass() {
tearDownThreadPool();
}

@Override
public void setUp() throws Exception {
super.setUp();
clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(
Settings.EMPTY,
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES)))
);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
transportService = new TransportService(
Settings.EMPTY,
mock(Transport.class),
null,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null,
null,
Collections.emptySet()
);

client = mock(Client.class);
when(client.threadPool()).thenReturn(threadPool);

actionFilters = mock(ActionFilters.class);
adTaskManager = mock(ADTaskManager.class);
action = new DeleteAnomalyDetectorTransportAction(
transportService,
actionFilters,
client,
clusterService,
Settings.EMPTY,
xContentRegistry(),
adTaskManager
);

jobParameter = mock(AnomalyDetectorJob.class);
when(jobParameter.getName()).thenReturn(randomAlphaOfLength(10));
IntervalSchedule schedule = new IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES);
when(jobParameter.getSchedule()).thenReturn(schedule);
when(jobParameter.getWindowDelay()).thenReturn(new IntervalTimeConfiguration(10, ChronoUnit.SECONDS));
}

public void testDeleteADTransportAction_FailDeleteResponse() {
future = mock(PlainActionFuture.class);
DeleteAnomalyDetectorRequest request = new DeleteAnomalyDetectorRequest("1234");
setupMocks(true, true, false, false);

action.doExecute(mock(Task.class), request, future);
verify(adTaskManager).deleteADTasks(eq("1234"), any(), any());
verify(client, times(1)).delete(any(), any());
verify(future).onFailure(any(OpenSearchStatusException.class));
}

public void testDeleteADTransportAction_NullAnomalyDetector() {
future = mock(PlainActionFuture.class);
DeleteAnomalyDetectorRequest request = new DeleteAnomalyDetectorRequest("1234");
setupMocks(true, false, false, false);

action.doExecute(mock(Task.class), request, future);
verify(adTaskManager).deleteADTasks(eq("1234"), any(), any());
verify(client, times(3)).delete(any(), any());
}

public void testDeleteADTransportAction_DeleteResponseException() {
future = mock(PlainActionFuture.class);
DeleteAnomalyDetectorRequest request = new DeleteAnomalyDetectorRequest("1234");
setupMocks(true, false, true, false);

action.doExecute(mock(Task.class), request, future);
verify(adTaskManager).deleteADTasks(eq("1234"), any(), any());
verify(client, times(1)).delete(any(), any());
verify(future).onFailure(any(RuntimeException.class));
}

public void testDeleteADTransportAction_LatestDetectorLevelTask() {
when(clusterService.state()).thenReturn(createClusterState());

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
Consumer<Optional<ADTask>> consumer = (Consumer<Optional<ADTask>>) args[2];
ADTask adTask = ADTask.builder().state("RUNNING").build();
consumer.accept(Optional.of(adTask));
return null;
}).when(adTaskManager).getAndExecuteOnLatestDetectorLevelTask(eq("1234"), any(), any(), eq(transportService), eq(true), any());

future = mock(PlainActionFuture.class);
DeleteAnomalyDetectorRequest request = new DeleteAnomalyDetectorRequest("1234");
setupMocks(false, false, false, false);

action.doExecute(mock(Task.class), request, future);
verify(future).onFailure(any(OpenSearchStatusException.class));
}

public void testDeleteADTransportAction_JobRunning() {
when(clusterService.state()).thenReturn(createClusterState());
future = mock(PlainActionFuture.class);
DeleteAnomalyDetectorRequest request = new DeleteAnomalyDetectorRequest("1234");
setupMocks(false, false, false, false);

action.doExecute(mock(Task.class), request, future);
verify(future).onFailure(any(RuntimeException.class));
}

public void testDeleteADTransportAction_GetResponseException() {
when(clusterService.state()).thenReturn(createClusterState());
future = mock(PlainActionFuture.class);
DeleteAnomalyDetectorRequest request = new DeleteAnomalyDetectorRequest("1234");
setupMocks(false, false, false, true);

action.doExecute(mock(Task.class), request, future);
verify(client).get(any(), any());
verify(client).get(any(), any());
}

private ClusterState createClusterState() {
ImmutableOpenMap<String, IndexMetadata> immutableOpenMap = ImmutableOpenMap
.<String, IndexMetadata>builder()
.fPut(
ANOMALY_DETECTOR_JOB_INDEX,
IndexMetadata
.builder("test")
.settings(
Settings
.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 1)
.put("index.version.created", Version.CURRENT.id)
)
.build()
)
.build();
Metadata metaData = Metadata.builder().indices(immutableOpenMap).build();
ClusterState clusterState = new ClusterState(new ClusterName("test_name"), 1l, "uuid", metaData, null, null, null, null, 1, true);
return clusterState;
}

private void setupMocks(
boolean nullAnomalyDetectorResponse,
boolean failDeleteDeleteResponse,
boolean deleteResponseException,
boolean getResponseFailure
) {
doAnswer(invocation -> {
Object[] args = invocation.getArguments();
Consumer<Optional<AnomalyDetector>> consumer = (Consumer<Optional<AnomalyDetector>>) args[1];
if (nullAnomalyDetectorResponse) {
consumer.accept(Optional.empty());
} else {
AnomalyDetector ad = mock(AnomalyDetector.class);
consumer.accept(Optional.of(ad));
}
return null;
}).when(adTaskManager).getDetector(any(), any(), any());

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
AnomalyDetectorFunction function = (AnomalyDetectorFunction) args[1];

function.execute();
return null;
}).when(adTaskManager).deleteADTasks(eq("1234"), any(), any());

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
ActionListener<DeleteResponse> listener = (ActionListener<DeleteResponse>) args[1];
deleteResponse = mock(DeleteResponse.class);
if (deleteResponseException) {
listener.onFailure(new RuntimeException("Failed to delete anomaly detector job"));
return null;
}
if (failDeleteDeleteResponse) {
doReturn(DocWriteResponse.Result.CREATED).when(deleteResponse).getResult();
} else {
doReturn(DocWriteResponse.Result.DELETED).when(deleteResponse).getResult();
}
listener.onResponse(deleteResponse);
return null;
}).when(client).delete(any(), any());

doAnswer(invocation -> {
Object[] args = invocation.getArguments();
ActionListener<GetResponse> listener = (ActionListener<GetResponse>) args[1];
if (getResponseFailure) {
listener.onFailure(new RuntimeException("Fail to get anomaly detector job"));
return null;
}
getResponse = new GetResponse(
new GetResult(
AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX,
"id",
UNASSIGNED_SEQ_NO,
0,
-1,
true,
BytesReference
.bytes(
new AnomalyDetectorJob(
"1234",
jobParameter.getSchedule(),
jobParameter.getWindowDelay(),
true,
Instant.now().minusSeconds(60),
Instant.now(),
Instant.now(),
60L,
TestHelpers.randomUser(),
jobParameter.getResultIndex()
).toXContent(TestHelpers.builder(), ToXContent.EMPTY_PARAMS)
),
Collections.emptyMap(),
Collections.emptyMap()
)
);
listener.onResponse(getResponse);
return null;
}).when(client).get(any(), any());
}
}
Loading

0 comments on commit 9a25a4d

Please sign in to comment.