diff --git a/.github/workflows/test_build_multi_platform.yml b/.github/workflows/test_build_multi_platform.yml index ffc2aa8a3..84943de99 100644 --- a/.github/workflows/test_build_multi_platform.yml +++ b/.github/workflows/test_build_multi_platform.yml @@ -11,7 +11,7 @@ jobs: Build-ad-windows: strategy: matrix: - java: [ 11, 17 ] + java: [ 11, 17, 20 ] name: Build and Test Anomaly Detection Plugin on Windows runs-on: windows-latest steps: @@ -39,7 +39,7 @@ jobs: Build-ad: strategy: matrix: - java: [11,17] + java: [11,17,20] os: [ubuntu-latest, macos-latest] fail-fast: false diff --git a/build.gradle b/build.gradle index c0278088e..55faa4625 100644 --- a/build.gradle +++ b/build.gradle @@ -64,8 +64,8 @@ buildscript { } plugins { - id 'nebula.ospackage' version "8.3.0" apply false - id "com.diffplug.gradle.spotless" version "3.26.1" + id 'com.netflix.nebula.ospackage' version "11.0.0" + id "com.diffplug.spotless" version "6.18.0" id 'java-library' // Gradle 7.6 support was added in test-retry 1.4.0. id 'org.gradle.test-retry' version '1.4.1' @@ -149,22 +149,17 @@ dependencies { } testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.1' - testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.25.0' - testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-module-junit4', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.2' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.3.1' testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.0.1' - testImplementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.9.15' - testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.9.15' + testImplementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.14.6' + testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.14.6' testCompileOnly 'org.apiguardian:apiguardian-api:1.1.0' - testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.2' - testImplementation 'org.junit.jupiter:junit-jupiter-params:5.7.2' - testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.7.2' + // jupiter is required to run unit tests not inherited from OpenSearchTestCase (e.g., PreviousValueImputerTests) + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.2' + testImplementation 'org.junit.jupiter:junit-jupiter-params:5.8.2' + testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.8.2' testImplementation "org.opensearch:opensearch-core:${opensearch_version}" - testRuntimeOnly 'org.junit.vintage:junit-vintage-engine:5.7.2' + testRuntimeOnly 'org.junit.vintage:junit-vintage-engine:5.8.2' testCompileOnly 'junit:junit:4.13.2' } @@ -190,6 +185,9 @@ allprojects { plugins.withId('java') { sourceCompatibility = targetCompatibility = JavaVersion.VERSION_11 } + plugins.withId('jacoco') { + jacoco.toolVersion = '0.8.10' + } } ext { @@ -215,10 +213,10 @@ configurations.all { force "org.apache.httpcomponents.client5:httpclient5:${versions.httpclient5}" force "commons-codec:commons-codec:${versions.commonscodec}" - force "org.mockito:mockito-core:2.25.0" + force "org.mockito:mockito-core:5.3.1" force "org.objenesis:objenesis:3.0.1" - force "net.bytebuddy:byte-buddy:1.9.15" - force "net.bytebuddy:byte-buddy-agent:1.9.15" + force "net.bytebuddy:byte-buddy:1.14.6" + force "net.bytebuddy:byte-buddy-agent:1.14.6" force "com.google.code.gson:gson:2.8.9" force "junit:junit:4.13.2" } @@ -302,6 +300,14 @@ test { excludeTestsMatching "org.opensearch.ad.ml.HCADModelPerfTests" } } + + /* Gradle 8 is including some of its own internal JARs into the test classpath, and there's + overlap with the dependencies org.junit.vintage:junit-vintage-engine pulling in. To prevent + jar hell, exclude these problematic JARs. */ + classpath = classpath.filter { + !it.toString().contains("junit-platform-engine-1.8.2.jar") && + !it.toString().contains("junit-platform-commons-1.8.2.jar") + } } task integTest(type: RestIntegTestTask) { @@ -711,8 +717,8 @@ jacocoTestCoverageVerification { jacocoTestReport { reports { - xml.enabled = true - html.enabled = true + xml.required = true // for coverlay + html.required = true // human readable } } @@ -722,10 +728,11 @@ jacocoTestCoverageVerification.dependsOn jacocoTestReport compileJava.options.compilerArgs << "-Xlint:-deprecation,-rawtypes,-serial,-try,-unchecked" test { + // required to run unit tests not inherited from OpenSearchTestCase (e.g., PreviousValueImputerTests) useJUnitPlatform() } -apply plugin: 'nebula.ospackage' +apply plugin: 'com.netflix.nebula.ospackage' // This is afterEvaluate because the bundlePlugin ZIP task is updated afterEvaluate and changes the ZIP name to match the plugin name afterEvaluate { @@ -735,7 +742,7 @@ afterEvaluate { version = "${project.version}" - "-SNAPSHOT" into '/usr/share/opensearch/plugins' - from(zipTree(bundlePlugin.archivePath)) { + from(zipTree(bundlePlugin.archiveFile)) { into opensearchplugin.name } @@ -766,9 +773,8 @@ afterEvaluate { task renameRpm(type: Copy) { from("$buildDir/distributions") into("$buildDir/distributions") - include archiveName - rename archiveName, "${packageName}-${version}.rpm" - doLast { delete file("$buildDir/distributions/$archiveName") } + rename "$archiveFileName", "${packageName}-${archiveVersion}.rpm" + doLast { delete file("$buildDir/distributions/$archiveFileName") } } } @@ -779,9 +785,8 @@ afterEvaluate { task renameDeb(type: Copy) { from("$buildDir/distributions") into("$buildDir/distributions") - include archiveName - rename archiveName, "${packageName}-${version}.deb" - doLast { delete file("$buildDir/distributions/$archiveName") } + rename "$archiveFileName", "${packageName}-${archiveVersion}.deb" + doLast { delete file("$buildDir/distributions/$archiveFileName") } } } diff --git a/gradle.properties b/gradle.properties new file mode 100644 index 000000000..d2eba77fc --- /dev/null +++ b/gradle.properties @@ -0,0 +1,30 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +# Enable build caching +org.gradle.caching=true +org.gradle.warning.mode=none +org.gradle.parallel=true +org.gradle.jvmargs=-Xmx3g -XX:+HeapDumpOnOutOfMemoryError -Xss2m \ + --add-exports jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED \ + --add-exports jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED \ + --add-exports jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED \ + --add-exports jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED \ + --add-exports jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED +options.forkOptions.memoryMaximumSize=3g + +# Disable duplicate project id detection +# See https://docs.gradle.org/current/userguide/upgrading_version_6.html#duplicate_project_names_may_cause_publication_to_fail +systemProp.org.gradle.dependency.duplicate.project.detection=false + +# Enforce the build to fail on deprecated gradle api usage +systemProp.org.gradle.warning.mode=fail + +# forcing to use TLS1.2 to avoid failure in vault +# see https://github.com/hashicorp/vault/issues/8750#issuecomment-631236121 +systemProp.jdk.tls.client.protocols=TLSv1.2 + +# jvm args for faster test execution by default +systemProp.tests.jvm.argline=-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 943f0cbfa..033e24c4c 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 508322917..9f4197d5f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,7 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.2.1-bin.zip networkTimeout=10000 +validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 65dcd68d6..fcb6fca14 100755 --- a/gradlew +++ b/gradlew @@ -85,9 +85,6 @@ done APP_BASE_NAME=${0##*/} APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' - # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,10 +130,13 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. @@ -144,7 +144,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac @@ -152,7 +152,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then '' | soft) :;; #( *) # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -197,6 +197,10 @@ if "$cygwin" || "$msys" ; then done fi + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + # Collect all arguments for the java command; # * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of # shell script including quotes and variable substitutions, so put them in diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java index ceb39914e..98135e1ee 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java @@ -307,38 +307,11 @@ private void runAnomalyDetectionJob( detectionStartTime.toEpochMilli(), executionStartTime.toEpochMilli() ); - client - .execute( - AnomalyResultAction.INSTANCE, - request, - ActionListener - .wrap( - response -> { - indexAnomalyResult( - jobParameter, - lockService, - lock, - detectionStartTime, - executionStartTime, - response, - recorder, - detector - ); - }, - exception -> { - handleAdException( - jobParameter, - lockService, - lock, - detectionStartTime, - executionStartTime, - exception, - recorder, - detector - ); - } - ) - ); + client.execute(AnomalyResultAction.INSTANCE, request, ActionListener.wrap(response -> { + indexAnomalyResult(jobParameter, lockService, lock, detectionStartTime, executionStartTime, response, recorder, detector); + }, exception -> { + handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception, recorder, detector); + })); } catch (Exception e) { indexAnomalyResultException( jobParameter, @@ -672,11 +645,9 @@ private void releaseLock(Job jobParameter, LockService lockService, LockModel lo lockService .release( lock, - ActionListener - .wrap( - released -> { log.info("Released lock for AD job {}", jobParameter.getName()); }, - exception -> { log.error("Failed to release lock for AD job: " + jobParameter.getName(), exception); } - ) + ActionListener.wrap(released -> { log.info("Released lock for AD job {}", jobParameter.getName()); }, exception -> { + log.error("Failed to release lock for AD job: " + jobParameter.getName(), exception); + }) ); } } diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java index fe0a0047e..c5336316c 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java @@ -84,11 +84,9 @@ public void executeDetector( listener.onResponse(Collections.emptyList()); return; } - ActionListener entityAnomalyResultListener = ActionListener - .wrap( - entityAnomalyResult -> { listener.onResponse(entityAnomalyResult.getAnomalyResults()); }, - e -> onFailure(e, listener, detector.getId()) - ); + ActionListener entityAnomalyResultListener = ActionListener.wrap(entityAnomalyResult -> { + listener.onResponse(entityAnomalyResult.getAnomalyResults()); + }, e -> onFailure(e, listener, detector.getId())); MultiResponsesDelegateActionListener multiEntitiesResponseListener = new MultiResponsesDelegateActionListener( entityAnomalyResultListener, diff --git a/src/main/java/org/opensearch/ad/caching/PriorityCache.java b/src/main/java/org/opensearch/ad/caching/PriorityCache.java index 175d77e64..40e28975d 100644 --- a/src/main/java/org/opensearch/ad/caching/PriorityCache.java +++ b/src/main/java/org/opensearch/ad/caching/PriorityCache.java @@ -133,12 +133,9 @@ public PriorityCache( Duration inactiveEntityTtl = DateUtils.toDuration(checkpointTtl.get(settings)); this.inActiveEntities = createInactiveCache(inactiveEntityTtl, maxInactiveStates); - clusterService - .getClusterSettings() - .addSettingsUpdateConsumer( - checkpointTtl, - it -> { this.inActiveEntities = createInactiveCache(DateUtils.toDuration(it), maxInactiveStates); } - ); + clusterService.getClusterSettings().addSettingsUpdateConsumer(checkpointTtl, it -> { + this.inActiveEntities = createInactiveCache(DateUtils.toDuration(it), maxInactiveStates); + }); this.threadPool = threadPool; this.random = new Random(42); @@ -163,19 +160,15 @@ public ModelState get(String modelId, AnomalyDetector detector) { // during maintenance period, stop putting new entries if (!maintenanceLock.isLocked() && modelState == null) { if (ADEnabledSetting.isDoorKeeperInCacheEnabled()) { - DoorKeeper doorKeeper = doorKeepers - .computeIfAbsent( - detectorId, - id -> { - // reset every 60 intervals - return new DoorKeeper( - TimeSeriesSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION, - TimeSeriesSettings.DOOR_KEEPER_FALSE_POSITIVE_RATE, - detector.getIntervalDuration().multipliedBy(TimeSeriesSettings.DOOR_KEEPER_MAINTENANCE_FREQ), - clock - ); - } + DoorKeeper doorKeeper = doorKeepers.computeIfAbsent(detectorId, id -> { + // reset every 60 intervals + return new DoorKeeper( + TimeSeriesSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION, + TimeSeriesSettings.DOOR_KEEPER_FALSE_POSITIVE_RATE, + detector.getIntervalDuration().multipliedBy(TimeSeriesSettings.DOOR_KEEPER_MAINTENANCE_FREQ), + clock ); + }); // first hit, ignore // since door keeper may get reset during maintenance, it is possible diff --git a/src/main/java/org/opensearch/ad/cluster/ADClusterEventListener.java b/src/main/java/org/opensearch/ad/cluster/ADClusterEventListener.java index 91594cf0f..4f629c7bb 100644 --- a/src/main/java/org/opensearch/ad/cluster/ADClusterEventListener.java +++ b/src/main/java/org/opensearch/ad/cluster/ADClusterEventListener.java @@ -72,19 +72,9 @@ public void clusterChanged(ClusterChangedEvent event) { if (delta.removed() || delta.added()) { LOG.info(NODE_CHANGED_MSG + ", node removed: {}, node added: {}", delta.removed(), delta.added()); hashRing.addNodeChangeEvent(); - hashRing - .buildCircles( - delta, - ActionListener - .runAfter( - ActionListener - .wrap( - hasRingBuildDone -> { LOG.info("Hash ring build result: {}", hasRingBuildDone); }, - e -> { LOG.error("Failed updating AD version hash ring", e); } - ), - () -> inProgress.release() - ) - ); + hashRing.buildCircles(delta, ActionListener.runAfter(ActionListener.wrap(hasRingBuildDone -> { + LOG.info("Hash ring build result: {}", hasRingBuildDone); + }, e -> { LOG.error("Failed updating AD version hash ring", e); }), () -> inProgress.release())); } else { inProgress.release(); } diff --git a/src/main/java/org/opensearch/ad/cluster/DailyCron.java b/src/main/java/org/opensearch/ad/cluster/DailyCron.java index d0366350d..2692608d2 100644 --- a/src/main/java/org/opensearch/ad/cluster/DailyCron.java +++ b/src/main/java/org/opensearch/ad/cluster/DailyCron.java @@ -58,26 +58,17 @@ public void run() { ) ) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - clientUtil - .execute( - DeleteByQueryAction.INSTANCE, - deleteRequest, - ActionListener - .wrap( - response -> { - // if 0 docs get deleted, it means our query cannot find any matching doc - LOG.info("{} " + CHECKPOINT_DELETED_MSG, response.getDeleted()); - }, - exception -> { - if (exception instanceof IndexNotFoundException) { - LOG.info(CHECKPOINT_NOT_EXIST_MSG); - } else { - // Gonna eventually delete in maintenance window. - LOG.error(CANNOT_DELETE_OLD_CHECKPOINT_MSG, exception); - } - } - ) - ); + clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> { + // if 0 docs get deleted, it means our query cannot find any matching doc + LOG.info("{} " + CHECKPOINT_DELETED_MSG, response.getDeleted()); + }, exception -> { + if (exception instanceof IndexNotFoundException) { + LOG.info(CHECKPOINT_NOT_EXIST_MSG); + } else { + // Gonna eventually delete in maintenance window. + LOG.error(CANNOT_DELETE_OLD_CHECKPOINT_MSG, exception); + } + })); } } diff --git a/src/main/java/org/opensearch/ad/cluster/HashRing.java b/src/main/java/org/opensearch/ad/cluster/HashRing.java index ee901efae..30ea1724f 100644 --- a/src/main/java/org/opensearch/ad/cluster/HashRing.java +++ b/src/main/java/org/opensearch/ad/cluster/HashRing.java @@ -177,13 +177,9 @@ public void buildCirclesForRealtimeAD() { if (nodeChangeEvents.isEmpty()) { return; } - buildCircles( - ActionListener - .wrap( - r -> { LOG.debug("build circles on AD versions successfully"); }, - e -> { LOG.error("Failed to build circles on AD versions", e); } - ) - ); + buildCircles(ActionListener.wrap(r -> { LOG.debug("build circles on AD versions successfully"); }, e -> { + LOG.error("Failed to build circles on AD versions", e); + })); } /** diff --git a/src/main/java/org/opensearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java b/src/main/java/org/opensearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java index 03ed36e76..28fc05e37 100644 --- a/src/main/java/org/opensearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java +++ b/src/main/java/org/opensearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java @@ -66,12 +66,12 @@ public void run() { .lte(clock.millis() - defaultCheckpointTtl.toMillis()) .format(ADCommonName.EPOCH_MILLIS_FORMAT) ), - ActionListener - .wrap( - response -> { cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); }, - // The docs will be deleted in next scheduled windows. No need for retrying. - exception -> LOG.error("delete docs by query fails for checkpoint index", exception) - ) + ActionListener.wrap(response -> { + cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); + }, + // The docs will be deleted in next scheduled windows. No need for retrying. + exception -> LOG.error("delete docs by query fails for checkpoint index", exception) + ) ); } diff --git a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java index 7653d5dcf..95ce25faa 100644 --- a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java +++ b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java @@ -101,9 +101,9 @@ public ADIndexManagement( historyRolloverPeriod = it; rescheduleRollover(); }); - this.clusterService - .getClusterSettings() - .addSettingsUpdateConsumer(AD_RESULT_HISTORY_RETENTION_PERIOD, it -> { historyRetentionPeriod = it; }); + this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_RETENTION_PERIOD, it -> { + historyRetentionPeriod = it; + }); this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_MAX_PRIMARY_SHARDS, it -> maxPrimaryShards = it); } diff --git a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java b/src/main/java/org/opensearch/ad/ml/CheckpointDao.java index b1d7db46e..adb097cb6 100644 --- a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java +++ b/src/main/java/org/opensearch/ad/ml/CheckpointDao.java @@ -603,14 +603,9 @@ private void deserializeTRCFModel( } String thresholdingModelId = SingleStreamModelIdMapper.getThresholdModelIdFromRCFModelId(rcfModelId); // query for threshold model and combinne rcf and threshold model into a ThresholdedRandomCutForest - getThresholdModel( - thresholdingModelId, - ActionListener - .wrap( - thresholdingModel -> { listener.onResponse(convertToTRCF(forest, thresholdingModel)); }, - listener::onFailure - ) - ); + getThresholdModel(thresholdingModelId, ActionListener.wrap(thresholdingModel -> { + listener.onResponse(convertToTRCF(forest, thresholdingModel)); + }, listener::onFailure)); } } catch (Exception e) { logger.error(new ParameterizedMessage("Unexpected error when deserializing [{}]", rcfModelId), e); @@ -627,12 +622,9 @@ private void deserializeTRCFModel( * @param listener Listener to return a pair of entity model and its last checkpoint time */ public void deserializeModelCheckpoint(String modelId, ActionListener>> listener) { - clientUtil - .asyncRequest( - new GetRequest(indexName, modelId), - client::get, - ActionListener.wrap(response -> { listener.onResponse(processGetResponse(response, modelId)); }, listener::onFailure) - ); + clientUtil.asyncRequest(new GetRequest(indexName, modelId), client::get, ActionListener.wrap(response -> { + listener.onResponse(processGetResponse(response, modelId)); + }, listener::onFailure)); } /** @@ -661,18 +653,14 @@ public void getTRCFModel(String modelId, ActionListenerasyncRequest( new GetRequest(indexName, modelId), client::get, - ActionListener - .wrap( - response -> deserializeTRCFModel(response, modelId, listener), - exception -> { - // expected exception, don't print stack trace - if (exception instanceof IndexNotFoundException) { - listener.onResponse(Optional.empty()); - } else { - listener.onFailure(exception); - } - } - ) + ActionListener.wrap(response -> deserializeTRCFModel(response, modelId, listener), exception -> { + // expected exception, don't print stack trace + if (exception instanceof IndexNotFoundException) { + listener.onResponse(Optional.empty()); + } else { + listener.onFailure(exception); + } + }) ); } @@ -697,16 +685,14 @@ public void getThresholdModel(String modelId, ActionListener { - // expected exception, don't print stack trace - if (exception instanceof IndexNotFoundException) { - listener.onResponse(Optional.empty()); - } else { - listener.onFailure(exception); - } + }, exception -> { + // expected exception, don't print stack trace + if (exception instanceof IndexNotFoundException) { + listener.onResponse(Optional.empty()); + } else { + listener.onFailure(exception); } - )); + })); } private Optional processThresholdModelCheckpoint(GetResponse response) { diff --git a/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java b/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java index 29de17d02..1044b84ce 100644 --- a/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java +++ b/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java @@ -248,19 +248,15 @@ private void coldStart( boolean earlyExit = true; try { - DoorKeeper doorKeeper = doorKeepers - .computeIfAbsent( - detectorId, - id -> { - // reset every 60 intervals - return new DoorKeeper( - TimeSeriesSettings.DOOR_KEEPER_FOR_COLD_STARTER_MAX_INSERTION, - TimeSeriesSettings.DOOR_KEEPER_FALSE_POSITIVE_RATE, - detector.getIntervalDuration().multipliedBy(TimeSeriesSettings.DOOR_KEEPER_MAINTENANCE_FREQ), - clock - ); - } + DoorKeeper doorKeeper = doorKeepers.computeIfAbsent(detectorId, id -> { + // reset every 60 intervals + return new DoorKeeper( + TimeSeriesSettings.DOOR_KEEPER_FOR_COLD_STARTER_MAX_INSERTION, + TimeSeriesSettings.DOOR_KEEPER_FALSE_POSITIVE_RATE, + detector.getIntervalDuration().multipliedBy(TimeSeriesSettings.DOOR_KEEPER_MAINTENANCE_FREQ), + clock ); + }); // Won't retry cold start within 60 intervals for an entity if (doorKeeper.mightContain(modelId)) { diff --git a/src/main/java/org/opensearch/ad/ml/ModelManager.java b/src/main/java/org/opensearch/ad/ml/ModelManager.java index 5cc2949a5..14f935aae 100644 --- a/src/main/java/org/opensearch/ad/ml/ModelManager.java +++ b/src/main/java/org/opensearch/ad/ml/ModelManager.java @@ -661,12 +661,16 @@ public Map getModelSize(String detectorId) { .entrySet() .stream() .filter(entry -> SingleStreamModelIdMapper.getDetectorIdForModelId(entry.getKey()).equals(detectorId)) - .forEach(entry -> { res.put(entry.getKey(), memoryTracker.estimateTRCFModelSize(entry.getValue().getModel())); }); + .forEach(entry -> { + res.put(entry.getKey(), memoryTracker.estimateTRCFModelSize(entry.getValue().getModel())); + }); thresholds .entrySet() .stream() .filter(entry -> SingleStreamModelIdMapper.getDetectorIdForModelId(entry.getKey()).equals(detectorId)) - .forEach(entry -> { res.put(entry.getKey(), (long) memoryTracker.getThresholdModelBytes()); }); + .forEach(entry -> { + res.put(entry.getKey(), (long) memoryTracker.getThresholdModelBytes()); + }); return res; } diff --git a/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java index 102b3e3fd..614d47bee 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java @@ -904,20 +904,14 @@ protected void validateAnomalyDetectorFeatures(String detectorId, boolean indexi return; } // checking runtime error from feature query - ActionListener>> validateFeatureQueriesListener = ActionListener - .wrap( - response -> { checkADNameExists(detectorId, indexingDryRun); }, - exception -> { - listener - .onFailure( - new ValidationException( - exception.getMessage(), - ValidationIssueType.FEATURE_ATTRIBUTES, - ValidationAspect.DETECTOR - ) - ); - } - ); + ActionListener>> validateFeatureQueriesListener = ActionListener.wrap(response -> { + checkADNameExists(detectorId, indexingDryRun); + }, exception -> { + listener + .onFailure( + new ValidationException(exception.getMessage(), ValidationIssueType.FEATURE_ATTRIBUTES, ValidationAspect.DETECTOR) + ); + }); MultiResponsesDelegateActionListener>> multiFeatureQueriesResponseListener = new MultiResponsesDelegateActionListener>>( validateFeatureQueriesListener, diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index 9b234d3b4..fd59759fc 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -252,21 +252,12 @@ private void onGetAnomalyDetectorJobForWrite( ); // Get latest realtime task and check its state before index job. Will reset running realtime task // as STOPPED first if job disabled, then start new job and create new realtime task. - adTaskManager - .startDetector( - detector, - null, - job.getUser(), - transportService, - ActionListener - .wrap( - r -> { indexAnomalyDetectorJob(newJob, null, listener); }, - e -> { - // Have logged error message in ADTaskManager#startDetector - listener.onFailure(e); - } - ) - ); + adTaskManager.startDetector(detector, null, job.getUser(), transportService, ActionListener.wrap(r -> { + indexAnomalyDetectorJob(newJob, null, listener); + }, e -> { + // Have logged error message in ADTaskManager#startDetector + listener.onFailure(e); + })); } } catch (IOException e) { String message = "Failed to parse anomaly detector job " + job.getName(); @@ -274,14 +265,9 @@ private void onGetAnomalyDetectorJobForWrite( listener.onFailure(new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)); } } else { - adTaskManager - .startDetector( - detector, - null, - job.getUser(), - transportService, - ActionListener.wrap(r -> { indexAnomalyDetectorJob(job, null, listener); }, e -> listener.onFailure(e)) - ); + adTaskManager.startDetector(detector, null, job.getUser(), transportService, ActionListener.wrap(r -> { + indexAnomalyDetectorJob(job, null, listener); + }, e -> listener.onFailure(e))); } } diff --git a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java index 4b152d93e..1f26d6bbd 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java @@ -731,16 +731,12 @@ private void processTopEntityResults(SearchResponse response, long latestTime) { } private void checkFeatureQueryDelegate(long latestTime) throws IOException { - ActionListener> validateFeatureQueriesListener = ActionListener - .wrap( - response -> { windowDelayRecommendation(latestTime); }, - exception -> { - listener - .onFailure( - new ValidationException(exception.getMessage(), ValidationIssueType.FEATURE_ATTRIBUTES, ValidationAspect.MODEL) - ); - } - ); + ActionListener> validateFeatureQueriesListener = ActionListener.wrap(response -> { + windowDelayRecommendation(latestTime); + }, exception -> { + listener + .onFailure(new ValidationException(exception.getMessage(), ValidationIssueType.FEATURE_ATTRIBUTES, ValidationAspect.MODEL)); + }); MultiResponsesDelegateActionListener> multiFeatureQueriesResponseListener = new MultiResponsesDelegateActionListener<>( validateFeatureQueriesListener, diff --git a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java index e4d28957a..f25b09af4 100644 --- a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java +++ b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java @@ -606,11 +606,9 @@ public void forwardOrExecuteADTask( .updateADTask( adTask.getTaskId(), updatedFields, - ActionListener - .wrap( - r -> forwardOrExecuteEntityTask(adTask, transportService, workerNodeResponseListener), - e -> { workerNodeResponseListener.onFailure(e); } - ) + ActionListener.wrap(r -> forwardOrExecuteEntityTask(adTask, transportService, workerNodeResponseListener), e -> { + workerNodeResponseListener.onFailure(e); + }) ); } } catch (Exception e) { diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index d6a2c7242..454e5cfc8 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -254,18 +254,9 @@ public ADTaskManager( .withType(TransportRequestOptions.Type.REG) .withTimeout(AD_REQUEST_TIMEOUT.get(settings)) .build(); - clusterService - .getClusterSettings() - .addSettingsUpdateConsumer( - AD_REQUEST_TIMEOUT, - it -> { - transportRequestOptions = TransportRequestOptions - .builder() - .withType(TransportRequestOptions.Type.REG) - .withTimeout(it) - .build(); - } - ); + clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_REQUEST_TIMEOUT, it -> { + transportRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout(it).build(); + }); this.threadPool = threadPool; this.checkingTaskSlot = new Semaphore(1); this.scaleEntityTaskLane = new Semaphore(1); @@ -1242,16 +1233,12 @@ private void stopHistoricalAnalysis( String userName = user == null ? null : user.getName(); ADCancelTaskRequest cancelTaskRequest = new ADCancelTaskRequest(detectorId, taskId, userName, dataNodes); - client - .execute( - ADCancelTaskAction.INSTANCE, - cancelTaskRequest, - ActionListener - .wrap(response -> { listener.onResponse(new AnomalyDetectorJobResponse(taskId, 0, 0, 0, RestStatus.OK)); }, e -> { - logger.error("Failed to cancel AD task " + taskId + ", detector id: " + detectorId, e); - listener.onFailure(e); - }) - ); + client.execute(ADCancelTaskAction.INSTANCE, cancelTaskRequest, ActionListener.wrap(response -> { + listener.onResponse(new AnomalyDetectorJobResponse(taskId, 0, 0, 0, RestStatus.OK)); + }, e -> { + logger.error("Failed to cancel AD task " + taskId + ", detector id: " + detectorId, e); + listener.onFailure(e); + })); } private boolean lastUpdateTimeOfHistoricalTaskExpired(ADTask adTask) { @@ -1360,16 +1347,9 @@ public void cleanDetectorCache( protected void cleanDetectorCache(ADTask adTask, TransportService transportService, ExecutorFunction function) { String detectorId = adTask.getConfigId(); String taskId = adTask.getTaskId(); - cleanDetectorCache( - adTask, - transportService, - function, - ActionListener - .wrap( - r -> { logger.debug("Successfully cleaned cache for detector {}, task {}", detectorId, taskId); }, - e -> { logger.error("Failed to clean cache for detector " + detectorId + ", task " + taskId, e); } - ) - ); + cleanDetectorCache(adTask, transportService, function, ActionListener.wrap(r -> { + logger.debug("Successfully cleaned cache for detector {}, task {}", detectorId, taskId); + }, e -> { logger.error("Failed to clean cache for detector " + detectorId + ", task " + taskId, e); })); } /** @@ -1841,14 +1821,9 @@ public void updateADTask(String taskId, Map updatedFields, Actio * @param taskId AD task id */ public void deleteADTask(String taskId) { - deleteADTask( - taskId, - ActionListener - .wrap( - r -> { logger.info("Deleted AD task {} with status: {}", taskId, r.status()); }, - e -> { logger.error("Failed to delete AD task " + taskId, e); } - ) - ); + deleteADTask(taskId, ActionListener.wrap(r -> { logger.info("Deleted AD task {} with status: {}", taskId, r.status()); }, e -> { + logger.error("Failed to delete AD task " + taskId, e); + })); } /** @@ -1926,16 +1901,12 @@ private void deleteADResultOfDetector(String detectorId) { logger.info("Start to delete AD results of detector {}", detectorId); DeleteByQueryRequest deleteADResultsRequest = new DeleteByQueryRequest(ALL_AD_RESULTS_INDEX_PATTERN); deleteADResultsRequest.setQuery(new TermQueryBuilder(DETECTOR_ID_FIELD, detectorId)); - client - .execute( - DeleteByQueryAction.INSTANCE, - deleteADResultsRequest, - ActionListener - .wrap(response -> { logger.debug("Successfully deleted AD results of detector " + detectorId); }, exception -> { - logger.error("Failed to delete AD results of detector " + detectorId, exception); - adTaskCacheManager.addDeletedConfig(detectorId); - }) - ); + client.execute(DeleteByQueryAction.INSTANCE, deleteADResultsRequest, ActionListener.wrap(response -> { + logger.debug("Successfully deleted AD results of detector " + detectorId); + }, exception -> { + logger.error("Failed to delete AD results of detector " + detectorId, exception); + adTaskCacheManager.addDeletedConfig(detectorId); + })); } /** @@ -2498,15 +2469,9 @@ public void runNextEntityForHCADHistorical( ActionListener listener ) { String detectorId = adTask.getConfigId(); - int scaleDelta = scaleTaskSlots( - adTask, - transportService, - ActionListener - .wrap( - r -> { logger.debug("Scale up task slots done for detector {}, task {}", detectorId, adTask.getTaskId()); }, - e -> { logger.error("Failed to scale up task slots for task " + adTask.getTaskId(), e); } - ) - ); + int scaleDelta = scaleTaskSlots(adTask, transportService, ActionListener.wrap(r -> { + logger.debug("Scale up task slots done for detector {}, task {}", detectorId, adTask.getTaskId()); + }, e -> { logger.error("Failed to scale up task slots for task " + adTask.getTaskId(), e); })); if (scaleDelta < 0) { logger .warn( @@ -2733,16 +2698,12 @@ public ADTaskProfile getLocalADTaskProfilesByDetectorId(String detectorId) { detectorTaskProfile.setDetectorTaskSlots(1); } } - threadPool - .executor(AD_BATCH_TASK_THREAD_POOL_NAME) - .execute( - () -> { - // Clean expired HC batch task run states as it may exists after HC historical analysis done if user cancel - // before querying top entities done. We will clean it in hourly cron, check "maintainRunningHistoricalTasks" - // method. Clean it up here when get task profile to release memory earlier. - adTaskCacheManager.cleanExpiredHCBatchTaskRunStates(); - } - ); + threadPool.executor(AD_BATCH_TASK_THREAD_POOL_NAME).execute(() -> { + // Clean expired HC batch task run states as it may exists after HC historical analysis done if user cancel + // before querying top entities done. We will clean it in hourly cron, check "maintainRunningHistoricalTasks" + // method. Clean it up here when get task profile to release memory earlier. + adTaskCacheManager.cleanExpiredHCBatchTaskRunStates(); + }); logger.debug("Local AD task profile of detector {}: {}", detectorId, detectorTaskProfile); return detectorTaskProfile; } @@ -3049,21 +3010,9 @@ private void maintainRunningHistoricalTask(ConcurrentLinkedQueue taskQue resetHistoricalDetectorTaskState(ImmutableList.of(adTask), () -> { logger.debug("Finished maintaining running historical task {}", adTask.getTaskId()); maintainRunningHistoricalTask(taskQueue, transportService); - }, - transportService, - ActionListener - .wrap( - r -> { - logger - .debug( - "Reset historical task state done for task {}, detector {}", - adTask.getTaskId(), - adTask.getConfigId() - ); - }, - e -> { logger.error("Failed to reset historical task state for task " + adTask.getTaskId(), e); } - ) - ); + }, transportService, ActionListener.wrap(r -> { + logger.debug("Reset historical task state done for task {}, detector {}", adTask.getTaskId(), adTask.getConfigId()); + }, e -> { logger.error("Failed to reset historical task state for task " + adTask.getTaskId(), e); })); }, TimeValue.timeValueSeconds(DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), AD_BATCH_TASK_THREAD_POOL_NAME); } diff --git a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java index 9b8caba19..221a935bc 100644 --- a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java @@ -155,25 +155,17 @@ private void deleteAnomalyDetectorJobDoc(String detectorId, ActionListener listener) { LOG.info("Delete detector info {}", detectorId); DeleteRequest deleteRequest = new DeleteRequest(ADCommonName.DETECTION_STATE_INDEX, detectorId); - client - .delete( - deleteRequest, - ActionListener - .wrap( - response -> { - // whether deleted state doc or not, continue as state doc may not exist - deleteAnomalyDetectorDoc(detectorId, listener); - }, - exception -> { - if (exception instanceof IndexNotFoundException) { - deleteAnomalyDetectorDoc(detectorId, listener); - } else { - LOG.error("Failed to delete detector state", exception); - listener.onFailure(exception); - } - } - ) - ); + client.delete(deleteRequest, ActionListener.wrap(response -> { + // whether deleted state doc or not, continue as state doc may not exist + deleteAnomalyDetectorDoc(detectorId, listener); + }, exception -> { + if (exception instanceof IndexNotFoundException) { + deleteAnomalyDetectorDoc(detectorId, listener); + } else { + LOG.error("Failed to delete detector state", exception); + listener.onFailure(exception); + } + })); } private void deleteAnomalyDetectorDoc(String detectorId, ActionListener listener) { diff --git a/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java index 7005c5905..3b040c9e1 100644 --- a/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java @@ -172,37 +172,27 @@ protected void getExecute(GetAnomalyDetectorRequest request, ActionListener { - listener - .onResponse( - new GetAnomalyDetectorResponse( - 0, - null, - 0, - 0, - null, - null, - false, - null, - null, - false, - null, - null, - profile, - true - ) - ); - }, - e -> listener.onFailure(e) + profileRunner.profile(detectorID, entity, entityProfilesToCollect, ActionListener.wrap(profile -> { + listener + .onResponse( + new GetAnomalyDetectorResponse( + 0, + null, + 0, + 0, + null, + null, + false, + null, + null, + false, + null, + null, + profile, + true ) - ); + ); + }, e -> listener.onFailure(e))); } else { Set profilesToCollect = getProfilesToCollect(typesStr, all); AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner( diff --git a/src/main/java/org/opensearch/ad/transport/SearchAnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/SearchAnomalyResultTransportAction.java index 55fd33865..2ae1f93d5 100644 --- a/src/main/java/org/opensearch/ad/transport/SearchAnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/SearchAnomalyResultTransportAction.java @@ -185,20 +185,12 @@ void multiSearch( context.restore(); // Send multiple search to check which index a user has permission to read. If search all indices directly, // search request will throw exception if user has no permission to search any index. - client - .multiSearch( - multiSearchRequest, - ActionListener - .wrap( - multiSearchResponse -> { - processMultiSearchResponse(multiSearchResponse, targetIndices, readableIndices, request, listener); - }, - multiSearchException -> { - logger.error("Failed to search custom AD result indices", multiSearchException); - listener.onFailure(multiSearchException); - } - ) - ); + client.multiSearch(multiSearchRequest, ActionListener.wrap(multiSearchResponse -> { + processMultiSearchResponse(multiSearchResponse, targetIndices, readableIndices, request, listener); + }, multiSearchException -> { + logger.error("Failed to search custom AD result indices", multiSearchException); + listener.onFailure(multiSearchException); + })); } @VisibleForTesting diff --git a/src/main/java/org/opensearch/ad/transport/handler/AnomalyIndexHandler.java b/src/main/java/org/opensearch/ad/transport/handler/AnomalyIndexHandler.java index a9552671d..9d539f797 100644 --- a/src/main/java/org/opensearch/ad/transport/handler/AnomalyIndexHandler.java +++ b/src/main/java/org/opensearch/ad/transport/handler/AnomalyIndexHandler.java @@ -193,41 +193,34 @@ protected void save(T toSave, String detectorId, String indexName) { } void saveIteration(IndexRequest indexRequest, String detectorId, Iterator backoff) { - clientUtil - .asyncRequest( - indexRequest, - client::index, - ActionListener - .wrap( - response -> { LOG.debug(String.format(Locale.ROOT, SUCCESS_SAVING_MSG, detectorId)); }, - exception -> { - // OpenSearch has a thread pool and a queue for write per node. A thread - // pool will have N number of workers ready to handle the requests. When a - // request comes and if a worker is free , this is handled by the worker. Now by - // default the number of workers is equal to the number of cores on that CPU. - // When the workers are full and there are more write requests, the request - // will go to queue. The size of queue is also limited. If by default size is, - // say, 200 and if there happens more parallel requests than this, then those - // requests would be rejected as you can see OpenSearchRejectedExecutionException. - // So OpenSearchRejectedExecutionException is the way that OpenSearch tells us that - // it cannot keep up with the current indexing rate. - // When it happens, we should pause indexing a bit before trying again, ideally - // with randomized exponential backoff. - Throwable cause = ExceptionsHelper.unwrapCause(exception); - if (!(cause instanceof OpenSearchRejectedExecutionException) || !backoff.hasNext()) { - LOG.error(String.format(Locale.ROOT, FAIL_TO_SAVE_ERR_MSG, detectorId), cause); - } else { - TimeValue nextDelay = backoff.next(); - LOG.warn(String.format(Locale.ROOT, RETRY_SAVING_ERR_MSG, detectorId), cause); - threadPool - .schedule( - () -> saveIteration(BulkUtil.cloneIndexRequest(indexRequest), detectorId, backoff), - nextDelay, - ThreadPool.Names.SAME - ); - } - } - ) - ); + clientUtil.asyncRequest(indexRequest, client::index, ActionListener.wrap(response -> { + LOG.debug(String.format(Locale.ROOT, SUCCESS_SAVING_MSG, detectorId)); + }, exception -> { + // OpenSearch has a thread pool and a queue for write per node. A thread + // pool will have N number of workers ready to handle the requests. When a + // request comes and if a worker is free , this is handled by the worker. Now by + // default the number of workers is equal to the number of cores on that CPU. + // When the workers are full and there are more write requests, the request + // will go to queue. The size of queue is also limited. If by default size is, + // say, 200 and if there happens more parallel requests than this, then those + // requests would be rejected as you can see OpenSearchRejectedExecutionException. + // So OpenSearchRejectedExecutionException is the way that OpenSearch tells us that + // it cannot keep up with the current indexing rate. + // When it happens, we should pause indexing a bit before trying again, ideally + // with randomized exponential backoff. + Throwable cause = ExceptionsHelper.unwrapCause(exception); + if (!(cause instanceof OpenSearchRejectedExecutionException) || !backoff.hasNext()) { + LOG.error(String.format(Locale.ROOT, FAIL_TO_SAVE_ERR_MSG, detectorId), cause); + } else { + TimeValue nextDelay = backoff.next(); + LOG.warn(String.format(Locale.ROOT, RETRY_SAVING_ERR_MSG, detectorId), cause); + threadPool + .schedule( + () -> saveIteration(BulkUtil.cloneIndexRequest(indexRequest), detectorId, backoff), + nextDelay, + ThreadPool.Names.SAME + ); + } + })); } } diff --git a/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java b/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java index 234f4c8c8..f27aa749e 100644 --- a/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java +++ b/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java @@ -98,9 +98,9 @@ public ForecastIndexManagement( historyRolloverPeriod = it; rescheduleRollover(); }); - this.clusterService - .getClusterSettings() - .addSettingsUpdateConsumer(FORECAST_RESULT_HISTORY_RETENTION_PERIOD, it -> { historyRetentionPeriod = it; }); + this.clusterService.getClusterSettings().addSettingsUpdateConsumer(FORECAST_RESULT_HISTORY_RETENTION_PERIOD, it -> { + historyRetentionPeriod = it; + }); this.clusterService.getClusterSettings().addSettingsUpdateConsumer(FORECAST_MAX_PRIMARY_SHARDS, it -> maxPrimaryShards = it); diff --git a/src/main/java/org/opensearch/timeseries/feature/SearchFeatureDao.java b/src/main/java/org/opensearch/timeseries/feature/SearchFeatureDao.java index 2b4e3493d..1ce44472f 100644 --- a/src/main/java/org/opensearch/timeseries/feature/SearchFeatureDao.java +++ b/src/main/java/org/opensearch/timeseries/feature/SearchFeatureDao.java @@ -495,8 +495,9 @@ public void getMinDataTime(Config config, Optional entity, AnalysisType .trackTotalHits(false) .size(0); SearchRequest searchRequest = new SearchRequest().indices(config.getIndices().toArray(new String[0])).source(searchSourceBuilder); - final ActionListener searchResponseListener = ActionListener - .wrap(response -> { listener.onResponse(parseMinDataTime(response)); }, listener::onFailure); + final ActionListener searchResponseListener = ActionListener.wrap(response -> { + listener.onResponse(parseMinDataTime(response)); + }, listener::onFailure); // inject user role while searching. clientUtil .asyncRequestWithInjectedSecurity( @@ -554,11 +555,9 @@ public void getFeaturesForPeriodByBatch( logger.debug("Batch query for detector {}: {} ", detector.getId(), searchSourceBuilder); SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0])).source(searchSourceBuilder); - final ActionListener searchResponseListener = ActionListener - .wrap( - response -> { listener.onResponse(parseBucketAggregationResponse(response, detector.getEnabledFeatureIds())); }, - listener::onFailure - ); + final ActionListener searchResponseListener = ActionListener.wrap(response -> { + listener.onResponse(parseBucketAggregationResponse(response, detector.getEnabledFeatureIds())); + }, listener::onFailure); // inject user role while searching. clientUtil .asyncRequestWithInjectedSecurity( diff --git a/src/main/java/org/opensearch/timeseries/util/ClientUtil.java b/src/main/java/org/opensearch/timeseries/util/ClientUtil.java index b04a8fbac..394065b2c 100644 --- a/src/main/java/org/opensearch/timeseries/util/ClientUtil.java +++ b/src/main/java/org/opensearch/timeseries/util/ClientUtil.java @@ -61,11 +61,8 @@ public void exe Request request, ActionListener listener ) { - client - .execute( - action, - request, - ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { listener.onFailure(exception); }) - ); + client.execute(action, request, ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { + listener.onFailure(exception); + })); } } diff --git a/src/main/java/org/opensearch/timeseries/util/ExceptionUtil.java b/src/main/java/org/opensearch/timeseries/util/ExceptionUtil.java index 82ed31b84..e4b7c751e 100644 --- a/src/main/java/org/opensearch/timeseries/util/ExceptionUtil.java +++ b/src/main/java/org/opensearch/timeseries/util/ExceptionUtil.java @@ -150,11 +150,9 @@ public static boolean isRetryAble(RestStatus status) { * @return the wrapped listener */ public static ActionListener wrapListener(ActionListener original, Exception exceptionToReturn, String detectorId) { - return ActionListener - .wrap( - r -> { original.onFailure(exceptionToReturn); }, - e -> { original.onFailure(selectHigherPriorityException(exceptionToReturn, e)); } - ); + return ActionListener.wrap(r -> { original.onFailure(exceptionToReturn); }, e -> { + original.onFailure(selectHigherPriorityException(exceptionToReturn, e)); + }); } /** diff --git a/src/test/java/org/opensearch/ad/cluster/ADClusterEventListenerTests.java b/src/test/java/org/opensearch/ad/cluster/ADClusterEventListenerTests.java index 640901a36..88546e5ce 100644 --- a/src/test/java/org/opensearch/ad/cluster/ADClusterEventListenerTests.java +++ b/src/test/java/org/opensearch/ad/cluster/ADClusterEventListenerTests.java @@ -13,7 +13,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.opensearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; diff --git a/src/test/java/org/opensearch/ad/cluster/HashRingTests.java b/src/test/java/org/opensearch/ad/cluster/HashRingTests.java index b18efd6bd..69bb38a57 100644 --- a/src/test/java/org/opensearch/ad/cluster/HashRingTests.java +++ b/src/test/java/org/opensearch/ad/cluster/HashRingTests.java @@ -208,7 +208,9 @@ public void testBuildAndGetOwningNodeWithSameLocalAdVersion() { .buildAndGetOwningNodeWithSameLocalAdVersion( "testModelId", node -> { assertTrue(node.isPresent()); }, - ActionListener.wrap(r -> {}, e -> { assertFalse("Failed to build hash ring", true); }) + ActionListener.wrap(r -> {}, e -> { + assertFalse("Failed to build hash ring", true); + }) ); } diff --git a/src/test/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanupTests.java b/src/test/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanupTests.java index 2830b5640..0748fe122 100644 --- a/src/test/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanupTests.java +++ b/src/test/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanupTests.java @@ -107,13 +107,9 @@ public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsTrue() throws Excep public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsFalse() throws Exception { long maxShardSize = 1000; when(storeStats.getSizeInBytes()).thenReturn(maxShardSize - 1); - indexCleanup - .deleteDocsBasedOnShardSize( - "indexname", - maxShardSize, - null, - ActionListener.wrap(Assert::assertFalse, exception -> { throw new RuntimeException(exception); }) - ); + indexCleanup.deleteDocsBasedOnShardSize("indexname", maxShardSize, null, ActionListener.wrap(Assert::assertFalse, exception -> { + throw new RuntimeException(exception); + })); } public void testDeleteDocsBasedOnShardSizeIndexNotExisted() throws Exception { diff --git a/src/test/java/org/opensearch/ad/feature/FeatureManagerTests.java b/src/test/java/org/opensearch/ad/feature/FeatureManagerTests.java index 95e667ff9..b78647c11 100644 --- a/src/test/java/org/opensearch/ad/feature/FeatureManagerTests.java +++ b/src/test/java/org/opensearch/ad/feature/FeatureManagerTests.java @@ -19,8 +19,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -46,9 +46,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -68,6 +65,9 @@ import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.model.IntervalTimeConfiguration; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) @SuppressWarnings("unchecked") public class FeatureManagerTests { diff --git a/src/test/java/org/opensearch/ad/feature/FeaturesTests.java b/src/test/java/org/opensearch/ad/feature/FeaturesTests.java index 447bdd6c4..7a6b3b8e1 100644 --- a/src/test/java/org/opensearch/ad/feature/FeaturesTests.java +++ b/src/test/java/org/opensearch/ad/feature/FeaturesTests.java @@ -18,12 +18,12 @@ import java.util.List; import java.util.Map.Entry; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Test; import org.junit.runner.RunWith; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class FeaturesTests { diff --git a/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java b/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java index 313800385..eb68bbe7f 100644 --- a/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java +++ b/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java @@ -77,24 +77,14 @@ public void testAnomalyDetectorIndexExists() throws IOException { } public void testAnomalyDetectorIndexExistsAndNotRecreate() throws IOException { - indices - .initConfigIndexIfAbsent( - TestHelpers - .createActionListener( - response -> response.isAcknowledged(), - failure -> { throw new RuntimeException("should not recreate index"); } - ) - ); + indices.initConfigIndexIfAbsent(TestHelpers.createActionListener(response -> response.isAcknowledged(), failure -> { + throw new RuntimeException("should not recreate index"); + })); TestHelpers.waitForIndexCreationToComplete(client(), CommonName.CONFIG_INDEX); if (client().admin().indices().prepareExists(CommonName.CONFIG_INDEX).get().isExists()) { - indices - .initConfigIndexIfAbsent( - TestHelpers - .createActionListener( - response -> { throw new RuntimeException("should not recreate index " + CommonName.CONFIG_INDEX); }, - failure -> { throw new RuntimeException("should not recreate index " + CommonName.CONFIG_INDEX); } - ) - ); + indices.initConfigIndexIfAbsent(TestHelpers.createActionListener(response -> { + throw new RuntimeException("should not recreate index " + CommonName.CONFIG_INDEX); + }, failure -> { throw new RuntimeException("should not recreate index " + CommonName.CONFIG_INDEX); })); } } @@ -114,26 +104,16 @@ public void testAnomalyResultIndexExists() throws IOException { public void testAnomalyResultIndexExistsAndNotRecreate() throws IOException { indices .initDefaultResultIndexIfAbsent( - TestHelpers - .createActionListener( - response -> logger.info("Acknowledged: " + response.isAcknowledged()), - failure -> { throw new RuntimeException("should not recreate index"); } - ) + TestHelpers.createActionListener(response -> logger.info("Acknowledged: " + response.isAcknowledged()), failure -> { + throw new RuntimeException("should not recreate index"); + }) ); TestHelpers.waitForIndexCreationToComplete(client(), ADCommonName.ANOMALY_RESULT_INDEX_ALIAS); if (client().admin().indices().prepareExists(ADCommonName.ANOMALY_RESULT_INDEX_ALIAS).get().isExists()) { - indices - .initDefaultResultIndexIfAbsent( - TestHelpers - .createActionListener( - response -> { - throw new RuntimeException("should not recreate index " + ADCommonName.ANOMALY_RESULT_INDEX_ALIAS); - }, - failure -> { - throw new RuntimeException("should not recreate index " + ADCommonName.ANOMALY_RESULT_INDEX_ALIAS, failure); - } - ) - ); + indices.initDefaultResultIndexIfAbsent(TestHelpers.createActionListener(response -> { + throw new RuntimeException("should not recreate index " + ADCommonName.ANOMALY_RESULT_INDEX_ALIAS); + }, failure -> { throw new RuntimeException("should not recreate index " + ADCommonName.ANOMALY_RESULT_INDEX_ALIAS, failure); }) + ); } } diff --git a/src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java b/src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java index 489d683b8..72358af10 100644 --- a/src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java +++ b/src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java @@ -11,8 +11,8 @@ package org.opensearch.ad.ml; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -105,10 +105,6 @@ import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.ClientUtil; -import test.org.opensearch.ad.util.JsonDeserializer; -import test.org.opensearch.ad.util.MLUtil; -import test.org.opensearch.ad.util.RandomModelStateConfig; - import com.amazon.randomcutforest.RandomCutForest; import com.amazon.randomcutforest.config.Precision; import com.amazon.randomcutforest.config.TransformMethod; @@ -124,6 +120,9 @@ import io.protostuff.LinkedBuffer; import io.protostuff.Schema; import io.protostuff.runtime.RuntimeSchema; +import test.org.opensearch.ad.util.JsonDeserializer; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.RandomModelStateConfig; public class CheckpointDaoTests extends OpenSearchTestCase { private static final Logger logger = LogManager.getLogger(CheckpointDaoTests.class); diff --git a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java index f838da051..188146f69 100644 --- a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java +++ b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java @@ -62,16 +62,16 @@ import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.settings.TimeSeriesSettings; -import test.org.opensearch.ad.util.LabelledAnomalyGenerator; -import test.org.opensearch.ad.util.MLUtil; -import test.org.opensearch.ad.util.MultiDimDataWithTime; - import com.amazon.randomcutforest.config.Precision; import com.amazon.randomcutforest.config.TransformMethod; import com.amazon.randomcutforest.parkservices.AnomalyDescriptor; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; import com.google.common.collect.ImmutableList; +import test.org.opensearch.ad.util.LabelledAnomalyGenerator; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.MultiDimDataWithTime; + public class EntityColdStarterTests extends AbstractCosineDataTest { @BeforeClass diff --git a/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java b/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java index f0e87b160..bf2732777 100644 --- a/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java +++ b/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java @@ -51,12 +51,12 @@ import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.settings.TimeSeriesSettings; -import test.org.opensearch.ad.util.LabelledAnomalyGenerator; -import test.org.opensearch.ad.util.MultiDimDataWithTime; - import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; import com.google.common.collect.ImmutableList; +import test.org.opensearch.ad.util.LabelledAnomalyGenerator; +import test.org.opensearch.ad.util.MultiDimDataWithTime; + @TimeoutSuite(millis = 60 * TimeUnits.MINUTE) // rcf may be slow due to bounding box cache disabled public class HCADModelPerfTests extends AbstractCosineDataTest { diff --git a/src/test/java/org/opensearch/ad/ml/HybridThresholdingModelTests.java b/src/test/java/org/opensearch/ad/ml/HybridThresholdingModelTests.java index be8ff9525..5e6ffdb9b 100644 --- a/src/test/java/org/opensearch/ad/ml/HybridThresholdingModelTests.java +++ b/src/test/java/org/opensearch/ad/ml/HybridThresholdingModelTests.java @@ -17,13 +17,13 @@ import java.util.Arrays; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.apache.commons.math3.distribution.NormalDistribution; import org.junit.Test; import org.junit.runner.RunWith; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class HybridThresholdingModelTests { diff --git a/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java b/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java index eda0cfb46..cbb7b09ba 100644 --- a/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java +++ b/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java @@ -17,7 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -44,9 +44,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -80,14 +77,16 @@ import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; -import test.org.opensearch.ad.util.MLUtil; -import test.org.opensearch.ad.util.RandomModelStateConfig; - import com.amazon.randomcutforest.RandomCutForest; import com.amazon.randomcutforest.parkservices.AnomalyDescriptor; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; import com.amazon.randomcutforest.returntypes.DiVector; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.RandomModelStateConfig; + @RunWith(JUnitParamsRunner.class) @SuppressWarnings("unchecked") public class ModelManagerTests { diff --git a/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java b/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java index 492bbec45..111041858 100644 --- a/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java +++ b/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java @@ -13,13 +13,13 @@ import static org.junit.Assert.assertEquals; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Test; import org.junit.runner.RunWith; import org.opensearch.ad.model.AnomalyResult; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class ThresholdingResultTests { diff --git a/src/test/java/org/opensearch/ad/mock/plugin/MockReindexPlugin.java b/src/test/java/org/opensearch/ad/mock/plugin/MockReindexPlugin.java index 07c116774..f8c54b112 100644 --- a/src/test/java/org/opensearch/ad/mock/plugin/MockReindexPlugin.java +++ b/src/test/java/org/opensearch/ad/mock/plugin/MockReindexPlugin.java @@ -122,11 +122,9 @@ protected void doExecute(Task task, DeleteByQueryRequest request, ActionListener .execute( BulkAction.INSTANCE, bulkRequest, - ActionListener - .wrap( - res -> { listener.onResponse(mockBulkByScrollResponse(totalHits)); }, - ex -> { listener.onFailure(ex); } - ) + ActionListener.wrap(res -> { listener.onResponse(mockBulkByScrollResponse(totalHits)); }, ex -> { + listener.onFailure(ex); + }) ); }, e -> { listener.onFailure(e); })); diff --git a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java index ae8ba3a54..41b8035b0 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java +++ b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java @@ -79,11 +79,11 @@ import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.stats.StatNames; +import com.fasterxml.jackson.core.JsonParseException; + import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.fasterxml.jackson.core.JsonParseException; - public class CheckpointReadWorkerTests extends AbstractRateLimitingTest { CheckpointReadWorker worker; diff --git a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java index 691e6b439..084e2d44f 100644 --- a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -1169,7 +1169,7 @@ public void testDeleteAnomalyDetectorWhileRunning() throws Exception { Assert.assertNotNull(detector.getId()); Instant now = Instant.now(); Response response = startAnomalyDetector(detector.getId(), new DateRange(now.minus(10, ChronoUnit.DAYS), now), client()); - Assert.assertThat(response.getStatusLine().toString(), CoreMatchers.containsString("200 OK")); + org.hamcrest.MatcherAssert.assertThat(response.getStatusLine().toString(), CoreMatchers.containsString("200 OK")); // Deleting detector should fail while its running Exception exception = expectThrows(IOException.class, () -> { deleteAnomalyDetector(detector.getId(), client()); }); @@ -1505,75 +1505,62 @@ public void testSearchTopAnomalyResultsWithInvalidInputs() throws IOException { ); // Missing start time - Exception missingStartTimeException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getId(), false, "{\"end_time_ms\":2}", client()); } - ); + Exception missingStartTimeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getId(), false, "{\"end_time_ms\":2}", client()); + }); assertTrue(missingStartTimeException.getMessage().contains("Must set both start time and end time with epoch of milliseconds")); // Missing end time - Exception missingEndTimeException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":1}", client()); } - ); + Exception missingEndTimeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":1}", client()); + }); assertTrue(missingEndTimeException.getMessage().contains("Must set both start time and end time with epoch of milliseconds")); // Start time > end time - Exception invalidTimeException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":2, \"end_time_ms\":1}", client()); } - ); + Exception invalidTimeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":2, \"end_time_ms\":1}", client()); + }); assertTrue(invalidTimeException.getMessage().contains("Start time should be before end time")); // Invalid detector ID - Exception invalidDetectorIdException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getId() + "-invalid", false, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); } - ); + Exception invalidDetectorIdException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getId() + "-invalid", false, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); + }); assertTrue(invalidDetectorIdException.getMessage().contains("Can't find config with id")); // Invalid order field - Exception invalidOrderException = expectThrows( - IOException.class, - () -> { - searchTopAnomalyResults( - detector.getId(), - false, - "{\"start_time_ms\":1, \"end_time_ms\":2, \"order\":\"invalid-order\"}", - client() - ); - } - ); + Exception invalidOrderException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults( + detector.getId(), + false, + "{\"start_time_ms\":1, \"end_time_ms\":2, \"order\":\"invalid-order\"}", + client() + ); + }); assertTrue(invalidOrderException.getMessage().contains("Ordering by invalid-order is not a valid option")); // Negative size field - Exception negativeSizeException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":-1}", client()); } - ); + Exception negativeSizeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":-1}", client()); + }); assertTrue(negativeSizeException.getMessage().contains("Size must be a positive integer")); // Zero size field - Exception zeroSizeException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":0}", client()); } - ); + Exception zeroSizeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":0}", client()); + }); assertTrue(zeroSizeException.getMessage().contains("Size must be a positive integer")); // Too large size field - Exception tooLargeSizeException = expectThrows( - IOException.class, - () -> { - searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":9999999}", client()); - } - ); + Exception tooLargeSizeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":9999999}", client()); + }); assertTrue(tooLargeSizeException.getMessage().contains("Size cannot exceed")); // No existing task ID for detector - Exception noTaskIdException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getId(), true, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); } - ); + Exception noTaskIdException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getId(), true, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); + }); assertTrue(noTaskIdException.getMessage().contains("No historical tasks found for detector ID " + detector.getId())); // Invalid category fields @@ -1603,12 +1590,9 @@ public void testSearchTopAnomalyResultsWithInvalidInputs() throws IOException { true, client() ); - Exception noCategoryFieldsException = expectThrows( - IOException.class, - () -> { - searchTopAnomalyResults(detectorWithNoCategoryFields.getId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); - } - ); + Exception noCategoryFieldsException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detectorWithNoCategoryFields.getId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); + }); assertTrue( noCategoryFieldsException .getMessage() diff --git a/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java b/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java index 9dab61e84..dcadb41ac 100644 --- a/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java +++ b/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java @@ -333,10 +333,9 @@ public void testStartApiFilterByEnabled() throws IOException { // User Cat has AD full access, but is part of different backend role so Cat should not be able to access // Alice detector Instant now = Instant.now(); - Exception exception = expectThrows( - IOException.class, - () -> { startAnomalyDetector(aliceDetector.getId(), new DateRange(now.minus(10, ChronoUnit.DAYS), now), catClient); } - ); + Exception exception = expectThrows(IOException.class, () -> { + startAnomalyDetector(aliceDetector.getId(), new DateRange(now.minus(10, ChronoUnit.DAYS), now), catClient); + }); Assert.assertTrue(exception.getMessage().contains("User does not have permissions to access detector: " + aliceDetector.getId())); } diff --git a/src/test/java/org/opensearch/ad/stats/ADStatsTests.java b/src/test/java/org/opensearch/ad/stats/ADStatsTests.java index 00f9836c7..6db1ac5cc 100644 --- a/src/test/java/org/opensearch/ad/stats/ADStatsTests.java +++ b/src/test/java/org/opensearch/ad/stats/ADStatsTests.java @@ -46,11 +46,11 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.timeseries.stats.StatNames; +import com.amazon.randomcutforest.RandomCutForest; + import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.amazon.randomcutforest.RandomCutForest; - public class ADStatsTests extends OpenSearchTestCase { private Map> statsMap; diff --git a/src/test/java/org/opensearch/ad/stats/suppliers/ModelsOnNodeSupplierTests.java b/src/test/java/org/opensearch/ad/stats/suppliers/ModelsOnNodeSupplierTests.java index a95a61b81..21a9e4aff 100644 --- a/src/test/java/org/opensearch/ad/stats/suppliers/ModelsOnNodeSupplierTests.java +++ b/src/test/java/org/opensearch/ad/stats/suppliers/ModelsOnNodeSupplierTests.java @@ -41,11 +41,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; +import com.amazon.randomcutforest.RandomCutForest; + import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.amazon.randomcutforest.RandomCutForest; - public class ModelsOnNodeSupplierTests extends OpenSearchTestCase { private RandomCutForest rcf; private HybridThresholdingModel thresholdingModel; diff --git a/src/test/java/org/opensearch/ad/transport/ADStatsTests.java b/src/test/java/org/opensearch/ad/transport/ADStatsTests.java index e2edac7ea..4836825f3 100644 --- a/src/test/java/org/opensearch/ad/transport/ADStatsTests.java +++ b/src/test/java/org/opensearch/ad/transport/ADStatsTests.java @@ -48,11 +48,11 @@ import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.stats.StatNames; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class ADStatsTests extends OpenSearchTestCase { String node1, nodeName1, clusterName; Map clusterStats; diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java index 79002ab44..50765deb8 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java @@ -509,13 +509,9 @@ private long getExecutingADTask() { adStatsRequest.addAll(validStats); StatsAnomalyDetectorResponse statsResponse = client().execute(StatsAnomalyDetectorAction.INSTANCE, adStatsRequest).actionGet(5000); AtomicLong totalExecutingTask = new AtomicLong(0); - statsResponse - .getAdStatsResponse() - .getADStatsNodesResponse() - .getNodes() - .forEach( - node -> { totalExecutingTask.getAndAdd((Long) node.getStatsMap().get(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName())); } - ); + statsResponse.getAdStatsResponse().getADStatsNodesResponse().getNodes().forEach(node -> { + totalExecutingTask.getAndAdd((Long) node.getStatsMap().get(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName())); + }); return totalExecutingTask.get(); } } diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java index 1de86c710..1b23b6d51 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java @@ -129,10 +129,10 @@ import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class AnomalyResultTests extends AbstractTimeSeriesTest { private Settings settings; private TransportService transportService; diff --git a/src/test/java/org/opensearch/ad/transport/CronTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/CronTransportActionTests.java index 06f375053..7c3de7ed2 100644 --- a/src/test/java/org/opensearch/ad/transport/CronTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/CronTransportActionTests.java @@ -43,10 +43,10 @@ import org.opensearch.timeseries.NodeStateManager; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class CronTransportActionTests extends AbstractTimeSeriesTest { private CronTransportAction action; private String localNodeID; diff --git a/src/test/java/org/opensearch/ad/transport/DeleteModelTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/DeleteModelTransportActionTests.java index 62429f962..b76925492 100644 --- a/src/test/java/org/opensearch/ad/transport/DeleteModelTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/DeleteModelTransportActionTests.java @@ -48,10 +48,10 @@ import org.opensearch.timeseries.NodeStateManager; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class DeleteModelTransportActionTests extends AbstractTimeSeriesTest { private DeleteModelTransportAction action; private String localNodeID; diff --git a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java index d3cc0ab4b..f7eb2c8e9 100644 --- a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java @@ -91,13 +91,13 @@ import org.opensearch.timeseries.stats.StatNames; import org.opensearch.transport.TransportService; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; + import test.org.opensearch.ad.util.JsonDeserializer; import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; - public class EntityResultTransportActionTests extends AbstractTimeSeriesTest { EntityResultTransportAction entityResult; ActionFilters actionFilters; diff --git a/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorActionTests.java b/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorActionTests.java index 9cc58bb8a..2a0b677ed 100644 --- a/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorActionTests.java @@ -12,32 +12,45 @@ package org.opensearch.ad.transport; import java.io.IOException; +import java.util.Collection; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; import org.opensearch.ad.model.ADTask; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.DetectorProfile; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.rest.RestStatus; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.timeseries.TestHelpers; +import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; +import org.opensearch.timeseries.model.Feature; import org.opensearch.timeseries.model.Job; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -@RunWith(PowerMockRunner.class) -@PrepareForTest(GetAnomalyDetectorResponse.class) -public class GetAnomalyDetectorActionTests { - @Before - public void setUp() throws Exception { +import com.google.common.collect.ImmutableList; +/** + * Need to extend from OpenSearchSingleNodeTestCase and override getPlugins and writeableRegistry + * for testGetResponse. Without it, we will have exception "can't read named writeable from StreamInput" + * when deserializing AnomalyDetector. + * + */ +public class GetAnomalyDetectorActionTests extends OpenSearchSingleNodeTestCase { + @Override + protected Collection> getPlugins() { + return pluginList(InternalSettingsPlugin.class, TimeSeriesAnalyticsPlugin.class); + } + + @Override + protected NamedWriteableRegistry writableRegistry() { + return getInstanceFromNode(NamedWriteableRegistry.class); } - @Test public void testGetRequest() throws IOException { BytesStreamOutput out = new BytesStreamOutput(); GetAnomalyDetectorRequest request = new GetAnomalyDetectorRequest("1234", 4321, false, false, "nonempty", "", false, null); @@ -48,12 +61,12 @@ public void testGetRequest() throws IOException { } - @Test public void testGetResponse() throws Exception { BytesStreamOutput out = new BytesStreamOutput(); - AnomalyDetector detector = Mockito.mock(AnomalyDetector.class); + Feature feature = TestHelpers.randomFeature(true); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(ImmutableList.of(feature)); // Mockito.mock(AnomalyDetector.class); Job detectorJob = Mockito.mock(Job.class); - Mockito.doNothing().when(detector).writeTo(out); + // Mockito.doNothing().when(detector).writeTo(out); GetAnomalyDetectorResponse response = new GetAnomalyDetectorResponse( 1234, "4567", @@ -71,8 +84,9 @@ public void testGetResponse() throws Exception { false ); response.writeTo(out); - StreamInput input = out.bytes().streamInput(); - PowerMockito.whenNew(AnomalyDetector.class).withAnyArguments().thenReturn(detector); + // StreamInput input = out.bytes().streamInput(); + NamedWriteableAwareStreamInput input = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), writableRegistry()); + // PowerMockito.whenNew(AnomalyDetector.class).withAnyArguments().thenReturn(detector); GetAnomalyDetectorResponse newResponse = new GetAnomalyDetectorResponse(input); Assert.assertNotNull(newResponse); } diff --git a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java index bc5691748..94e07fe3c 100644 --- a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java @@ -15,8 +15,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -132,11 +132,11 @@ import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; +import com.google.common.collect.ImmutableList; + import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.google.common.collect.ImmutableList; - public class MultiEntityResultTests extends AbstractTimeSeriesTest { private AnomalyResultTransportAction action; private AnomalyResultRequest request; @@ -581,8 +581,9 @@ private SearchResponse createEmptyResponse() { when(emptyComposite.getName()).thenReturn(CompositeRetriever.AGG_NAME_COMP); when(emptyComposite.afterKey()).thenReturn(null); // empty bucket - when(emptyComposite.getBuckets()) - .thenAnswer((Answer>) invocation -> { return new ArrayList(); }); + when(emptyComposite.getBuckets()).thenAnswer((Answer>) invocation -> { + return new ArrayList(); + }); Aggregations emptyAggs = new Aggregations(Collections.singletonList(emptyComposite)); SearchResponseSections emptySections = new SearchResponseSections(SearchHits.empty(), emptyAggs, null, false, null, null, 1); return new SearchResponse(emptySections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY); @@ -1014,8 +1015,9 @@ public void testNullFeatures() throws InterruptedException { when(emptyComposite.getName()).thenReturn(null); when(emptyComposite.afterKey()).thenReturn(null); // empty bucket - when(emptyComposite.getBuckets()) - .thenAnswer((Answer>) invocation -> { return new ArrayList(); }); + when(emptyComposite.getBuckets()).thenAnswer((Answer>) invocation -> { + return new ArrayList(); + }); Aggregations emptyAggs = new Aggregations(Collections.singletonList(emptyComposite)); SearchResponseSections emptySections = new SearchResponseSections(SearchHits.empty(), emptyAggs, null, false, null, null, 1); SearchResponse nullResponse = new SearchResponse(emptySections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY); @@ -1053,8 +1055,9 @@ public void testRetry() throws IOException, InterruptedException { when(emptyNonNullComposite.afterKey()).thenReturn(attrs3); List emptyNonNullCompositeBuckets = new ArrayList<>(); - when(emptyNonNullComposite.getBuckets()) - .thenAnswer((Answer>) invocation -> { return emptyNonNullCompositeBuckets; }); + when(emptyNonNullComposite.getBuckets()).thenAnswer((Answer>) invocation -> { + return emptyNonNullCompositeBuckets; + }); Aggregations emptyNonNullAggs = new Aggregations(Collections.singletonList(emptyNonNullComposite)); diff --git a/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java index c9553e1fb..38cdce966 100644 --- a/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java @@ -14,7 +14,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -181,7 +180,7 @@ public void onFailure(Exception e) { ActionListener listener = responseMock.getArgument(3); listener.onResponse(TestHelpers.randomFeatures()); return null; - }).when(featureManager).getPreviewFeatures(anyObject(), anyLong(), anyLong(), any()); + }).when(featureManager).getPreviewFeatures(any(), anyLong(), anyLong(), any()); action.doExecute(task, request, previewResponse); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } @@ -381,7 +380,7 @@ public void onFailure(Exception e) { ActionListener listener = responseMock.getArgument(3); listener.onResponse(TestHelpers.randomFeatures()); return null; - }).when(featureManager).getPreviewFeatures(anyObject(), anyLong(), anyLong(), any()); + }).when(featureManager).getPreviewFeatures(any(), anyLong(), anyLong(), any()); action.doExecute(task, request, previewResponse); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } diff --git a/src/test/java/org/opensearch/ad/transport/ProfileTests.java b/src/test/java/org/opensearch/ad/transport/ProfileTests.java index 7d15fac77..7df0d5e02 100644 --- a/src/test/java/org/opensearch/ad/transport/ProfileTests.java +++ b/src/test/java/org/opensearch/ad/transport/ProfileTests.java @@ -43,11 +43,11 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.timeseries.constant.CommonName; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class ProfileTests extends OpenSearchTestCase { String node1, nodeName1, clusterName; String node2, nodeName2; diff --git a/src/test/java/org/opensearch/ad/transport/RCFPollingTests.java b/src/test/java/org/opensearch/ad/transport/RCFPollingTests.java index 9ba12535f..8cb592927 100644 --- a/src/test/java/org/opensearch/ad/transport/RCFPollingTests.java +++ b/src/test/java/org/opensearch/ad/transport/RCFPollingTests.java @@ -54,12 +54,12 @@ import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.FakeNode; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import test.org.opensearch.ad.util.FakeNode; +import test.org.opensearch.ad.util.JsonDeserializer; + public class RCFPollingTests extends AbstractTimeSeriesTest { Gson gson = new GsonBuilder().create(); private String detectorId = "jqIG6XIBEyaF3zCMZfcB"; diff --git a/src/test/java/org/opensearch/ad/transport/RCFResultTests.java b/src/test/java/org/opensearch/ad/transport/RCFResultTests.java index c520b94bf..23e5db59c 100644 --- a/src/test/java/org/opensearch/ad/transport/RCFResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/RCFResultTests.java @@ -61,11 +61,11 @@ import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import test.org.opensearch.ad.util.JsonDeserializer; + public class RCFResultTests extends OpenSearchTestCase { Gson gson = new GsonBuilder().create(); diff --git a/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java b/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java index cda346059..cdc19a4ac 100644 --- a/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java +++ b/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java @@ -99,24 +99,15 @@ public void testForecastResultIndexExists() throws IOException { public void testForecastResultIndexExistsAndNotRecreate() throws IOException { indices .initDefaultResultIndexIfAbsent( - TestHelpers - .createActionListener( - response -> logger.info("Acknowledged: " + response.isAcknowledged()), - failure -> { throw new RuntimeException("should not recreate index"); } - ) + TestHelpers.createActionListener(response -> logger.info("Acknowledged: " + response.isAcknowledged()), failure -> { + throw new RuntimeException("should not recreate index"); + }) ); TestHelpers.waitForIndexCreationToComplete(client(), ForecastIndex.RESULT.getIndexName()); if (client().admin().indices().prepareExists(ForecastIndex.RESULT.getIndexName()).get().isExists()) { - indices - .initDefaultResultIndexIfAbsent( - TestHelpers - .createActionListener( - response -> { throw new RuntimeException("should not recreate index " + ForecastIndex.RESULT.getIndexName()); }, - failure -> { - throw new RuntimeException("should not recreate index " + ForecastIndex.RESULT.getIndexName(), failure); - } - ) - ); + indices.initDefaultResultIndexIfAbsent(TestHelpers.createActionListener(response -> { + throw new RuntimeException("should not recreate index " + ForecastIndex.RESULT.getIndexName()); + }, failure -> { throw new RuntimeException("should not recreate index " + ForecastIndex.RESULT.getIndexName(), failure); })); } } @@ -168,11 +159,9 @@ public void testCustomResultIndexExists() throws IOException { indices .initCustomResultIndexDirectly( indexName, - TestHelpers - .createActionListener( - response -> logger.info("Acknowledged: " + response.isAcknowledged()), - failure -> { throw new RuntimeException("should not recreate index"); } - ) + TestHelpers.createActionListener(response -> logger.info("Acknowledged: " + response.isAcknowledged()), failure -> { + throw new RuntimeException("should not recreate index"); + }) ); TestHelpers.waitForIndexCreationToComplete(client(), indexName); assertTrue((client().admin().indices().prepareExists(indexName).get().isExists())); @@ -331,11 +320,9 @@ public void testInitCustomResultIndexAndExecuteIndex() throws InterruptedExcepti indices .initCustomResultIndexDirectly( indexName, - TestHelpers - .createActionListener( - response -> logger.info("Acknowledged: " + response.isAcknowledged()), - failure -> { throw new RuntimeException("should not recreate index"); } - ) + TestHelpers.createActionListener(response -> logger.info("Acknowledged: " + response.isAcknowledged()), failure -> { + throw new RuntimeException("should not recreate index"); + }) ); TestHelpers.waitForIndexCreationToComplete(client(), indexName); CountDownLatch latch = new CountDownLatch(1); diff --git a/src/test/java/org/opensearch/timeseries/NodeStateManagerTests.java b/src/test/java/org/opensearch/timeseries/NodeStateManagerTests.java index 27d57d62a..e52255818 100644 --- a/src/test/java/org/opensearch/timeseries/NodeStateManagerTests.java +++ b/src/test/java/org/opensearch/timeseries/NodeStateManagerTests.java @@ -13,12 +13,11 @@ import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import java.io.IOException; @@ -235,7 +234,7 @@ public void testShouldMute() { public void testMaintenanceDoNothing() { stateManager.maintenance(); - verifyZeroInteractions(clock); + verifyNoInteractions(clock); } public void testGetAnomalyDetector() throws IOException, InterruptedException { diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index a95327d7c..23a5150cc 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -12,13 +12,13 @@ package org.opensearch.timeseries; import static org.apache.hc.core5.http.ContentType.APPLICATION_JSON; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.opensearch.test.OpenSearchTestCase.*; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; import java.io.IOException; import java.nio.ByteBuffer; diff --git a/src/test/java/org/opensearch/timeseries/dataprocessor/SingleFeatureLinearUniformImputerTests.java b/src/test/java/org/opensearch/timeseries/dataprocessor/SingleFeatureLinearUniformImputerTests.java index 17aae0422..0bf7bdffb 100644 --- a/src/test/java/org/opensearch/timeseries/dataprocessor/SingleFeatureLinearUniformImputerTests.java +++ b/src/test/java/org/opensearch/timeseries/dataprocessor/SingleFeatureLinearUniformImputerTests.java @@ -15,13 +15,13 @@ import java.util.Arrays; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class SingleFeatureLinearUniformImputerTests { diff --git a/src/test/java/org/opensearch/timeseries/dataprocessor/ZeroImputerTests.java b/src/test/java/org/opensearch/timeseries/dataprocessor/ZeroImputerTests.java index a13189db2..8e03821e2 100644 --- a/src/test/java/org/opensearch/timeseries/dataprocessor/ZeroImputerTests.java +++ b/src/test/java/org/opensearch/timeseries/dataprocessor/ZeroImputerTests.java @@ -7,13 +7,13 @@ import static org.junit.Assert.assertArrayEquals; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class ZeroImputerTests { diff --git a/src/test/java/org/opensearch/timeseries/feature/NoPowermockSearchFeatureDaoTests.java b/src/test/java/org/opensearch/timeseries/feature/NoPowermockSearchFeatureDaoTests.java index a35ddc08b..aa8ba932c 100644 --- a/src/test/java/org/opensearch/timeseries/feature/NoPowermockSearchFeatureDaoTests.java +++ b/src/test/java/org/opensearch/timeseries/feature/NoPowermockSearchFeatureDaoTests.java @@ -12,6 +12,7 @@ package org.opensearch.timeseries.feature; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -344,8 +345,9 @@ public void testGetHighestCountEntitiesExhaustedPages() throws InterruptedExcept when(emptyComposite.getName()).thenReturn(SearchFeatureDao.AGG_NAME_TOP); when(emptyComposite.afterKey()).thenReturn(null); // empty bucket - when(emptyComposite.getBuckets()) - .thenAnswer((Answer>) invocation -> { return new ArrayList(); }); + when(emptyComposite.getBuckets()).thenAnswer((Answer>) invocation -> { + return new ArrayList(); + }); Aggregations emptyAggs = new Aggregations(Collections.singletonList(emptyComposite)); SearchResponseSections emptySections = new SearchResponseSections(SearchHits.empty(), emptyAggs, null, false, null, null, 1); SearchResponse emptyResponse = new SearchResponse(emptySections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY); diff --git a/src/test/java/org/opensearch/timeseries/feature/SearchFeatureDaoParamTests.java b/src/test/java/org/opensearch/timeseries/feature/SearchFeatureDaoParamTests.java index 920374b2f..6d67b3f72 100644 --- a/src/test/java/org/opensearch/timeseries/feature/SearchFeatureDaoParamTests.java +++ b/src/test/java/org/opensearch/timeseries/feature/SearchFeatureDaoParamTests.java @@ -35,9 +35,6 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.junit.Test; @@ -55,8 +52,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; -import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.index.query.QueryBuilders; @@ -70,7 +65,6 @@ import org.opensearch.search.aggregations.metrics.InternalTDigestPercentiles; import org.opensearch.search.aggregations.metrics.Max; import org.opensearch.search.aggregations.metrics.Percentile; -import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.NodeStateManager; @@ -80,15 +74,10 @@ import org.opensearch.timeseries.dataprocessor.LinearUniformImputer; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.settings.TimeSeriesSettings; -import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.SecurityClientUtil; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.modules.junit4.PowerMockRunnerDelegate; -import com.google.gson.Gson; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; /** * Due to https://tinyurl.com/2y265s2w, tests with and without @Parameters annotation @@ -96,10 +85,7 @@ * while SearchFeatureDaoTests do not use @Parameters. * */ -@PowerMockIgnore("javax.management.*") -@RunWith(PowerMockRunner.class) -@PowerMockRunnerDelegate(JUnitParamsRunner.class) -@PrepareForTest({ ParseUtils.class, Gson.class }) +@RunWith(JUnitParamsRunner.class) public class SearchFeatureDaoParamTests { private SearchFeatureDao searchFeatureDao; @@ -146,7 +132,6 @@ public class SearchFeatureDaoParamTests { private Clock clock; private SearchRequest searchRequest; - private SearchSourceBuilder searchSourceBuilder; private MultiSearchRequest multiSearchRequest; private IntervalTimeConfiguration detectionInterval; private String detectorId; @@ -156,7 +141,7 @@ public class SearchFeatureDaoParamTests { @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); - PowerMockito.mockStatic(ParseUtils.class); + // PowerMockito.mockStatic(ParseUtils.class); imputer = new LinearUniformImputer(false); @@ -192,8 +177,6 @@ public void setup() throws Exception { when(detector.getFilterQuery()).thenReturn(QueryBuilders.matchAllQuery()); when(detector.getCategoryFields()).thenReturn(Collections.singletonList("a")); - searchSourceBuilder = SearchSourceBuilder - .fromXContent(XContentType.JSON.xContent().createParser(xContent, LoggingDeprecationHandler.INSTANCE, "{}")); searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0])); when(max.getName()).thenReturn(CommonName.AGG_NAME_MAX_TIME); @@ -233,14 +216,13 @@ public void getFeaturesForPeriod_returnExpectedToListener(List aggs long start = 100L; long end = 200L; - when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder); when(searchResponse.getAggregations()).thenReturn(new Aggregations(aggs)); when(detector.getEnabledFeatureIds()).thenReturn(featureIds); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onResponse(searchResponse); return null; - }).when(client).search(eq(searchRequest), any(ActionListener.class)); + }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener); diff --git a/src/test/java/org/opensearch/timeseries/feature/SearchFeatureDaoTests.java b/src/test/java/org/opensearch/timeseries/feature/SearchFeatureDaoTests.java index 99b57b506..9731d31b5 100644 --- a/src/test/java/org/opensearch/timeseries/feature/SearchFeatureDaoTests.java +++ b/src/test/java/org/opensearch/timeseries/feature/SearchFeatureDaoTests.java @@ -12,6 +12,7 @@ package org.opensearch.timeseries.feature; import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.AnyOf.anyOf; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -44,7 +45,6 @@ import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -79,6 +79,7 @@ import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.opensearch.search.aggregations.metrics.InternalMax; import org.opensearch.search.aggregations.metrics.InternalMin; import org.opensearch.search.aggregations.metrics.InternalTDigestPercentiles; import org.opensearch.search.aggregations.metrics.Max; @@ -96,16 +97,8 @@ import org.opensearch.timeseries.model.Entity; import org.opensearch.timeseries.model.IntervalTimeConfiguration; import org.opensearch.timeseries.settings.TimeSeriesSettings; -import org.opensearch.timeseries.util.ParseUtils; import org.opensearch.timeseries.util.SecurityClientUtil; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -@PowerMockIgnore("javax.management.*") -@RunWith(PowerMockRunner.class) -@PrepareForTest({ ParseUtils.class }) + public class SearchFeatureDaoTests { private SearchFeatureDao searchFeatureDao; @@ -159,7 +152,7 @@ public class SearchFeatureDaoTests { @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); - PowerMockito.mockStatic(ParseUtils.class); + // PowerMockito.mockStatic(ParseUtils.class); imputer = new LinearUniformImputer(false); @@ -265,7 +258,10 @@ public void getLatestDataTime_returnExpectedToListener() { return null; }).when(client).search(eq(searchRequest), any(ActionListener.class)); - when(ParseUtils.getLatestDataTime(eq(searchResponse))).thenReturn(Optional.of(epochTime)); + InternalMax maxAgg = new InternalMax(CommonName.AGG_NAME_MAX_TIME, epochTime, DocValueFormat.RAW, emptyMap()); + InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(maxAgg)); + when(searchResponse.getAggregations()).thenReturn(internalAggregations); + ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getLatestDataTime(detector, listener); @@ -296,13 +292,12 @@ public void getFeaturesForPeriod_throwToListener_whenResponseParsingFails() thro long start = 100L; long end = 200L; - when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder); when(detector.getEnabledFeatureIds()).thenReturn(null); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onResponse(searchResponse); return null; - }).when(client).search(eq(searchRequest), any(ActionListener.class)); + }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener); @@ -316,12 +311,11 @@ public void getFeaturesForPeriod_throwToListener_whenSearchFails() throws Except long start = 100L; long end = 200L; - when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onFailure(new RuntimeException()); return null; - }).when(client).search(eq(searchRequest), any(ActionListener.class)); + }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener);