diff --git a/.github/workflows/backport.yml b/.github/workflows/backport.yml new file mode 100644 index 000000000..e3f96a44f --- /dev/null +++ b/.github/workflows/backport.yml @@ -0,0 +1,29 @@ + +name: Backport +on: + pull_request_target: + types: + - closed + - labeled + +jobs: + backport: + runs-on: ubuntu-latest + permissions: + contents: write + pull-requests: write + name: Backport + steps: + - name: GitHub App token + id: github_app_token + uses: tibdex/github-app-token@v1.5.0 + with: + app_id: ${{ secrets.APP_ID }} + private_key: ${{ secrets.APP_PRIVATE_KEY }} + installation_id: 22958780 + + - name: Backport + uses: VachaShah/backport@v1.1.4 + with: + github_token: ${{ steps.github_app_token.outputs.token }} + branch_name: backport/backport-${{ github.event.number }} diff --git a/.github/workflows/delete_backport_branch.yml b/.github/workflows/delete_backport_branch.yml new file mode 100644 index 000000000..f24f022b0 --- /dev/null +++ b/.github/workflows/delete_backport_branch.yml @@ -0,0 +1,15 @@ +name: Delete merged branch of the backport PRs +on: + pull_request: + types: + - closed + +jobs: + delete-branch: + runs-on: ubuntu-latest + if: startsWith(github.event.pull_request.head.ref,'backport/') + steps: + - name: Delete merged branch + uses: SvanBoxel/delete-merged-branch@main + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index 45972e79d..5d443dabc 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [ 14 ] + java: [ 11 ] # Job name name: Build and test Alerting # This job runs on Linux @@ -26,10 +26,5 @@ jobs: # This step uses the checkout Github action: https://github.com/actions/checkout - name: Checkout Branch uses: actions/checkout@v2 - # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 14 - uses: actions/setup-java@v1 - with: - java-version: 14 - name: Run integration tests with multi node config run: ./gradlew integTest -PnumNodes=3 -Dopensearch.version=1.3.0-SNAPSHOT \ No newline at end of file diff --git a/.github/workflows/security-test-workflow.yml b/.github/workflows/security-test-workflow.yml index cc05d44b8..b6624ef2f 100644 --- a/.github/workflows/security-test-workflow.yml +++ b/.github/workflows/security-test-workflow.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [ 14 ] + java: [ 11 ] # Job name name: Build and test Alerting # This job runs on Linux @@ -27,10 +27,10 @@ jobs: - name: Checkout Branch uses: actions/checkout@v2 # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 14 + - name: Set Up JDK 11 uses: actions/setup-java@v1 with: - java-version: 14 + java-version: 11 - name: Build Alerting # Only assembling since the full build is governed by other workflows run: ./gradlew assemble -Dopensearch.version=1.3.0-SNAPSHOT diff --git a/.github/workflows/test-workflow.yml b/.github/workflows/test-workflow.yml index 459d33adf..59d78a495 100644 --- a/.github/workflows/test-workflow.yml +++ b/.github/workflows/test-workflow.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [14] + java: [8, 11, 14] # Job name name: Build Alerting with JDK ${{ matrix.java }} # This job runs on Linux diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index 282d23e93..e07bd50f7 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -1,6 +1,7 @@ - [Developer Guide](#developer-guide) - [Forking and Cloning](#forking-and-cloning) - [Install Prerequisites](#install-prerequisites) + - [JDK 11](#jdk-11) - [JDK 14](#jdk-14) - [Setup](#setup) - [Build](#build) @@ -18,14 +19,32 @@ Fork this repository on GitHub, and clone locally with `git clone`. ### Install Prerequisites +#### JDK 11 + +OpenSearch builds using Java 11 at a minimum, using the Adoptium distribution. This means you must have a JDK 11 installed with the environment variable `JAVA_HOME` referencing the path to Java home for your JDK 11 installation, e.g. `JAVA_HOME=/usr/lib/jvm/jdk-11`. This is configured in [buildSrc/build.gradle](buildSrc/build.gradle) and [distribution/tools/java-version-checker/build.gradle](distribution/tools/java-version-checker/build.gradle). + +``` +allprojects { + targetCompatibility = JavaVersion.VERSION_11 + sourceCompatibility = JavaVersion.VERSION_11 +} +``` + +``` +sourceCompatibility = JavaVersion.VERSION_11 +targetCompatibility = JavaVersion.VERSION_11 +``` + +Download Java 11 from [here](https://adoptium.net/releases.html?variant=openjdk11). + #### JDK 14 -OpenSearch components build using Java 14 at a minimum. This means you must have a JDK 14 installed with the environment variable `JAVA_HOME` referencing the path to Java home for your JDK 14 installation, e.g. `JAVA_HOME=/usr/lib/jvm/jdk-14`. +To run the full suite of tests, download and install [JDK 14](https://jdk.java.net/archive/) and set `JAVA11_HOME`, and `JAVA14_HOME`. They are required by the [backwards compatibility test](./TESTING.md#testing-backwards-compatibility). ### Setup 1. Clone the repository (see [Forking and Cloning](#forking-and-cloning)) -2. Make sure `JAVA_HOME` is pointing to a Java 14 JDK (see [Install Prerequisites](#install-prerequisites)) +2. Make sure `JAVA_HOME` is pointing to a Java 11 JDK (see [Install Prerequisites](#install-prerequisites)) 3. Launch Intellij IDEA, Choose Import Project and select the settings.gradle file in the root of this package. ### Build @@ -105,4 +124,9 @@ You can do this by running `./gradlew :alerting:run -PnumNodes=` You can also debug a multi-node cluster, by using a combination of above multi-node and debug steps. -But, you must set up debugger configurations to listen on each port starting from `5005` and increasing by 1 for each node. +But, you must set up debugger configurations to listen on each port starting from `5005` and increasing by 1 for each node. + +### Backport + +- [Link to backport documentation](https://github.com/opensearch-project/opensearch-plugins/blob/main/BACKPORT.md) + diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 1fab8dfbf..7afe7c856 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -2,6 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ + package org.opensearch.alerting import org.opensearch.action.ActionRequest @@ -30,6 +31,7 @@ import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction +import org.opensearch.alerting.core.model.ClusterMetricsInput import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler @@ -117,7 +119,7 @@ import java.util.function.Supplier * Entry point of the OpenDistro for Elasticsearch alerting plugin * This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers. * It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY], - * [BucketLevelTrigger.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects. + * [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects. */ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() { @@ -209,7 +211,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, - BucketLevelTrigger.XCONTENT_REGISTRY + BucketLevelTrigger.XCONTENT_REGISTRY, + ClusterMetricsInput.XCONTENT_REGISTRY ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 0ce280ca6..4af66dc7c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -8,6 +8,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.core.model.ClusterMetricsInput import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.elasticapi.convertToMap import org.opensearch.alerting.elasticapi.suspendUntil @@ -16,6 +17,8 @@ import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.TriggerAfterKey import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.addUserBackendRolesFilter +import org.opensearch.alerting.util.executeTransportAction +import org.opensearch.alerting.util.toMap import org.opensearch.client.Client import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput @@ -85,6 +88,11 @@ class InputService( ) results += searchResponse.convertToMap() } + is ClusterMetricsInput -> { + logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}") + val response = executeTransportAction(input, client) + results += response.toMap() + } else -> { throw IllegalArgumentException("Unsupported input type: ${input.name()}.") } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt index 7a8965751..68bcf5966 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt @@ -5,6 +5,7 @@ package org.opensearch.alerting.model +import org.opensearch.alerting.core.model.ClusterMetricsInput import org.opensearch.alerting.core.model.CronSchedule import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.Schedule @@ -15,6 +16,7 @@ import org.opensearch.alerting.elasticapi.optionalTimeField import org.opensearch.alerting.elasticapi.optionalUserField import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS +import org.opensearch.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.alerting.util._ID import org.opensearch.alerting.util._VERSION @@ -69,6 +71,8 @@ data class Monitor( require(trigger is QueryLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" } MonitorType.BUCKET_LEVEL_MONITOR -> require(trigger is BucketLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" } + MonitorType.CLUSTER_METRICS_MONITOR -> + require(trigger is QueryLevelTrigger) { "Incompatible trigger [$trigger.id] for monitor type [$monitorType]" } } } if (enabled) { @@ -113,7 +117,8 @@ data class Monitor( // This is different from 'type' which denotes the Scheduled Job type enum class MonitorType(val value: String) { QUERY_LEVEL_MONITOR("query_level_monitor"), - BUCKET_LEVEL_MONITOR("bucket_level_monitor"); + BUCKET_LEVEL_MONITOR("bucket_level_monitor"), + CLUSTER_METRICS_MONITOR("cluster_metrics_monitor"); override fun toString(): String { return value @@ -250,7 +255,10 @@ data class Monitor( INPUTS_FIELD -> { ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_ARRAY) { - inputs.add(Input.parse(xcp)) + val input = Input.parse(xcp) + if (input is ClusterMetricsInput) + SupportedClusterMetricsSettings.validateApiType(input) + inputs.add(input) } } TRIGGERS_FIELD -> { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/SupportedClusterMetricsSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/SupportedClusterMetricsSettings.kt new file mode 100644 index 000000000..a6a9bf642 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/SupportedClusterMetricsSettings.kt @@ -0,0 +1,138 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.settings + +import org.opensearch.action.ActionRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest +import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest +import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest +import org.opensearch.action.admin.indices.recovery.RecoveryRequest +import org.opensearch.alerting.core.model.ClusterMetricsInput +import org.opensearch.alerting.core.model.ClusterMetricsInput.ClusterMetricType +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.json.JsonXContent + +/** + * A class that supports storing a unique set of API paths that can be accessed by general users. + */ +class SupportedClusterMetricsSettings { + companion object { + const val RESOURCE_FILE = "supported_json_payloads.json" + + /** + * The key in this map represents the path to call an API. + * + * NOTE: Paths should conform to the following pattern: + * "/_cluster/stats" + * + * The value in these maps represents a path root mapped to a list of paths to field values. + * If the value mapped to an API is an empty map, no fields will be redacted from the API response. + * + * NOTE: Keys in this map should consist of root components of the response body; e.g.,: + * "indices" + * + * Values in these maps should consist of the remaining fields in the path + * to the supported value separated by periods; e.g.,: + * "shards.total", + * "shards.index.shards.min" + * + * In this example for ClusterStats, the response will only include + * the values at the end of these two paths: + * "/_cluster/stats": { + * "indices": [ + * "shards.total", + * "shards.index.shards.min" + * ] + * } + */ + private var supportedApiList = HashMap>>() + + init { + val supportedJsonPayloads = SupportedClusterMetricsSettings::class.java.getResource(RESOURCE_FILE) + + @Suppress("UNCHECKED_CAST") + if (supportedJsonPayloads != null) + supportedApiList = XContentHelper.convertToMap(JsonXContent.jsonXContent, supportedJsonPayloads.readText(), false) as HashMap>> + } + + /** + * Returns the map of all supported json payload associated with the provided path from supportedApiList. + * @param path The path for the requested API. + * @return The map of the supported json payload for the requested API. + * @throws IllegalArgumentException When supportedApiList does not contain a value for the provided key. + */ + fun getSupportedJsonPayload(path: String): Map> { + return supportedApiList[path] ?: throw IllegalArgumentException("API path not in supportedApiList.") + } + + /** + * Will return an [ActionRequest] for the API associated with that path. + * Will otherwise throw an exception. + * @param clusterMetricsInput The [ClusterMetricsInput] to resolve. + * @throws IllegalArgumentException when the requested API is not supported. + * @return The [ActionRequest] for the API associated with the provided [ClusterMetricsInput]. + */ + fun resolveToActionRequest(clusterMetricsInput: ClusterMetricsInput): ActionRequest { + val pathParams = clusterMetricsInput.parsePathParams() + return when (clusterMetricsInput.clusterMetricType) { + ClusterMetricType.CAT_PENDING_TASKS -> PendingClusterTasksRequest() + ClusterMetricType.CAT_RECOVERY -> { + if (pathParams.isEmpty()) return RecoveryRequest() + val pathParamsArray = pathParams.split(",").toTypedArray() + return RecoveryRequest(*pathParamsArray) + } + ClusterMetricType.CAT_SNAPSHOTS -> { + return GetSnapshotsRequest(pathParams, arrayOf(GetSnapshotsRequest.ALL_SNAPSHOTS)) + } + ClusterMetricType.CAT_TASKS -> ListTasksRequest() + ClusterMetricType.CLUSTER_HEALTH -> { + if (pathParams.isEmpty()) return ClusterHealthRequest() + val pathParamsArray = pathParams.split(",").toTypedArray() + return ClusterHealthRequest(*pathParamsArray) + } + ClusterMetricType.CLUSTER_SETTINGS -> ClusterStateRequest().routingTable(false).nodes(false) + ClusterMetricType.CLUSTER_STATS -> { + if (pathParams.isEmpty()) return ClusterStatsRequest() + val pathParamsArray = pathParams.split(",").toTypedArray() + return ClusterStatsRequest(*pathParamsArray) + } + ClusterMetricType.NODES_STATS -> NodesStatsRequest().addMetrics( + "os", + "process", + "jvm", + "thread_pool", + "fs", + "transport", + "http", + "breaker", + "script", + "discovery", + "ingest", + "adaptive_selection", + "script_cache", + "indexing_pressure", + "shard_indexing_pressure" + ) + else -> throw IllegalArgumentException("Unsupported API.") + } + } + + /** + * Confirms whether the provided path is in [supportedApiList]. + * Throws an exception if the provided path is not on the list; otherwise performs no action. + * @param clusterMetricsInput The [ClusterMetricsInput] to validate. + * @throws IllegalArgumentException when supportedApiList does not contain the provided path. + */ + fun validateApiType(clusterMetricsInput: ClusterMetricsInput) { + if (!supportedApiList.keys.contains(clusterMetricsInput.clusterMetricType.defaultPath)) + throw IllegalArgumentException("API path not in supportedApiList.") + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt new file mode 100644 index 000000000..cb70d578a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt @@ -0,0 +1,128 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.action.ActionResponse +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse +import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksRequest +import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse +import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest +import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest +import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse +import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest +import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse +import org.opensearch.action.admin.indices.recovery.RecoveryRequest +import org.opensearch.action.admin.indices.recovery.RecoveryResponse +import org.opensearch.alerting.core.model.ClusterMetricsInput +import org.opensearch.alerting.core.model.ClusterMetricsInput.ClusterMetricType +import org.opensearch.alerting.elasticapi.convertToMap +import org.opensearch.alerting.settings.SupportedClusterMetricsSettings +import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest +import org.opensearch.client.Client +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.support.XContentMapValues + +/** + * Calls the appropriate transport action for the API requested in the [clusterMetricsInput]. + * @param clusterMetricsInput The [ClusterMetricsInput] to resolve. + * @param client The [Client] used to call the respective transport action. + * @throws IllegalArgumentException When the requested API is not supported by this feature. + */ +fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse { + val request = resolveToActionRequest(clusterMetricsInput) + return when (clusterMetricsInput.clusterMetricType) { + ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest).get() + ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get() + ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get() + ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get() + ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get() + ClusterMetricType.CLUSTER_SETTINGS -> { + val metadata = client.admin().cluster().state(request as ClusterStateRequest).get().state.metadata + return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY) + } + ClusterMetricType.CLUSTER_STATS -> client.admin().cluster().clusterStats(request as ClusterStatsRequest).get() + ClusterMetricType.NODES_STATS -> client.admin().cluster().nodesStats(request as NodesStatsRequest).get() + else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}") + } +} + +/** + * Populates a [HashMap] with the values in the [ActionResponse]. + * @return The [ActionResponse] values formatted in a [HashMap]. + * @throws IllegalArgumentException when the [ActionResponse] is not supported by this feature. + */ +fun ActionResponse.toMap(): Map { + return when (this) { + is ClusterHealthResponse -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_HEALTH.defaultPath) + ) + is ClusterStatsResponse -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_STATS.defaultPath) + ) + is ClusterGetSettingsResponse -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CLUSTER_SETTINGS.defaultPath) + ) + is NodesStatsResponse -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.NODES_STATS.defaultPath) + ) + is PendingClusterTasksResponse -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_PENDING_TASKS.defaultPath) + ) + is RecoveryResponse -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_RECOVERY.defaultPath) + ) + is GetSnapshotsResponse -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_SNAPSHOTS.defaultPath) + ) + is ListTasksResponse -> redactFieldsFromResponse( + this.convertToMap(), + SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricType.CAT_TASKS.defaultPath) + ) + else -> throw IllegalArgumentException("Unsupported ActionResponse type: ${this.javaClass.name}") + } +} + +/** + * Populates a [HashMap] with only the values that support being exposed to users. + * @param mappedActionResponse The response from the [ClusterMetricsInput] API call. + * @param supportedJsonPayload The JSON payload as configured in [SupportedClusterMetricsSettings.RESOURCE_FILE]. + * @return The response values [HashMap] without the redacted fields. + */ +@Suppress("UNCHECKED_CAST") +fun redactFieldsFromResponse( + mappedActionResponse: Map, + supportedJsonPayload: Map> +): Map { + return when { + supportedJsonPayload.isEmpty() -> mappedActionResponse + else -> { + val output = hashMapOf() + for ((key, value) in supportedJsonPayload) { + when (val mappedValue = mappedActionResponse[key]) { + is Map<*, *> -> output[key] = XContentMapValues.filter( + mappedActionResponse[key] as MutableMap?, + value.toTypedArray(), arrayOf() + ) + else -> output[key] = mappedValue ?: hashMapOf() + } + } + output + } + } +} diff --git a/alerting/src/main/resources/org/opensearch/alerting/settings/supported_json_payloads.json b/alerting/src/main/resources/org/opensearch/alerting/settings/supported_json_payloads.json new file mode 100644 index 000000000..9ed045ab3 --- /dev/null +++ b/alerting/src/main/resources/org/opensearch/alerting/settings/supported_json_payloads.json @@ -0,0 +1,10 @@ +{ + "/_cat/pending_tasks": {}, + "/_cat/recovery": {}, + "/_cat/snapshots": {}, + "/_cat/tasks": {}, + "/_cluster/health": {}, + "/_cluster/settings": {}, + "/_cluster/stats": {}, + "/_nodes/stats": {} +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 786331672..c73c8a9a2 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -35,6 +35,7 @@ import org.opensearch.client.Request import org.opensearch.client.Response import org.opensearch.client.RestClient import org.opensearch.client.WarningFailureException +import org.opensearch.common.io.PathUtils import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler @@ -51,7 +52,6 @@ import org.opensearch.rest.RestStatus import org.opensearch.search.SearchModule import java.net.URLEncoder import java.nio.file.Files -import java.nio.file.Path import java.time.Instant import java.time.ZonedDateTime import java.time.format.DateTimeFormatter @@ -1020,7 +1020,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { false ) proxy.getExecutionData(false)?.let { - val path = Path.of("$jacocoBuildPath/integTest.exec") + val path = PathUtils.get("$jacocoBuildPath/integTest.exec") Files.write(path, it) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt index d5e1f4a64..dc505ae41 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt @@ -818,6 +818,154 @@ class MonitorRunnerIT : AlertingRestTestCase() { } */ + fun `test create ClusterMetricsInput monitor with ClusterHealth API`() { + // GIVEN + val path = "/_cluster/health" + val input = randomClusterMetricsInput(path = path) + val monitor = createMonitor(randomClusterMetricsMonitor(inputs = listOf(input))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0] + val errorMessage = inputResults["error"] + + assertEquals(monitor.name, output["monitor_name"]) + assertTrue( + "Monitor results should contain cluster_name, but found: $resultsContent", + resultsContent.toString().contains("cluster_name") + ) + assertNull("There should not be an error message, but found: $errorMessage", errorMessage) + } + + fun `test create ClusterMetricsInput monitor with ClusterStats API`() { + // GIVEN + val path = "/_cluster/stats" + val input = randomClusterMetricsInput(path = path) + val monitor = createMonitor(randomClusterMetricsMonitor(inputs = listOf(input))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0] + val errorMessage = inputResults["error"] + + assertEquals(monitor.name, output["monitor_name"]) + assertTrue( + "Monitor results should contain monitor_name, but found: $resultsContent", + resultsContent.toString().contains("memory_size_in_bytes") + ) + assertNull("There should not be an error message, but found: $errorMessage", errorMessage) + } + + fun `test create ClusterMetricsInput monitor with alert triggered`() { + // GIVEN + putAlertMappings() + val trigger = randomQueryLevelTrigger( + condition = Script( + """ + return ctx.results[0].number_of_pending_tasks < 1 + """.trimIndent() + ), + destinationId = createDestination().id + ) + val path = "/_cluster/health" + val input = randomClusterMetricsInput(path = path) + val monitor = createMonitor(randomClusterMetricsMonitor(inputs = listOf(input), triggers = listOf(trigger))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + val triggerResults = output.objectMap("trigger_results").values + for (triggerResult in triggerResults) { + assertTrue( + "This triggerResult should be triggered: $triggerResult", + triggerResult.objectMap("action_results").isNotEmpty() + ) + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert not saved, $output", 1, alerts.size) + verifyAlert(alerts.single(), monitor, ACTIVE) + } + + fun `test create ClusterMetricsInput monitor with no alert triggered`() { + // GIVEN + putAlertMappings() + val trigger = randomQueryLevelTrigger( + condition = Script( + """ + return ctx.results[0].status.equals("red") + """.trimIndent() + ) + ) + val path = "/_cluster/stats" + val input = randomClusterMetricsInput(path = path) + val monitor = createMonitor(randomClusterMetricsMonitor(inputs = listOf(input), triggers = listOf(trigger))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + val triggerResults = output.objectMap("trigger_results").values + for (triggerResult in triggerResults) { + assertTrue( + "This triggerResult should not be triggered: $triggerResult", + triggerResult.objectMap("action_results").isEmpty() + ) + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor, output: $output", 0, alerts.size) + } + + fun `test create ClusterMetricsInput monitor for ClusterHealth API with path parameters`() { + // GIVEN + val indices = (1..5).map { createTestIndex() }.toTypedArray() + val pathParams = indices.joinToString(",") + val path = "/_cluster/health/" + val input = randomClusterMetricsInput( + path = path, + pathParams = pathParams + ) + val monitor = createMonitor(randomClusterMetricsMonitor(inputs = listOf(input))) + + // WHEN + val response = executeMonitor(monitor.id) + + // THEN + val output = entityAsMap(response) + val inputResults = output.stringMap("input_results") + val resultsContent = (inputResults?.get("results") as ArrayList<*>)[0] + val errorMessage = inputResults["error"] + + assertEquals(monitor.name, output["monitor_name"]) + assertTrue( + "Monitor results should contain cluster_name, but found: $resultsContent", + resultsContent.toString().contains("cluster_name") + ) + assertNull("There should not be an error message, but found: $errorMessage", errorMessage) + } + + // TODO: Once an API is implemented that supports adding/removing entries on the + // SupportedApiSettings::supportedApiList, create an test that simulates executing + // a preexisting ClusterMetricsInput monitor for an API that has been removed from the supportedApiList. + // This will likely involve adding an API to the list before creating the monitor, and then removing + // the API from the list before executing the monitor. + fun `test execute monitor with custom webhook destination and denied host`() { listOf("http://10.1.1.1", "127.0.0.1").forEach { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 1195c6b0c..14781c330 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -2,6 +2,7 @@ * Copyright OpenSearch Contributors * SPDX-License-Identifier: Apache-2.0 */ + package org.opensearch.alerting import junit.framework.TestCase.assertNull @@ -9,6 +10,7 @@ import org.apache.http.Header import org.apache.http.HttpEntity import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter +import org.opensearch.alerting.core.model.ClusterMetricsInput import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.IntervalSchedule import org.opensearch.alerting.core.model.Schedule @@ -127,6 +129,24 @@ fun randomBucketLevelMonitor( ) } +fun randomClusterMetricsMonitor( + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User = randomUser(), + inputs: List = listOf(randomClusterMetricsInput()), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false +): Monitor { + return Monitor( + name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() + ) +} + fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), @@ -351,6 +371,14 @@ fun randomQueryLevelTriggerRunResult(): QueryLevelTriggerRunResult { return QueryLevelTriggerRunResult("trigger-name", true, null, map) } +fun randomClusterMetricsInput( + path: String = ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH.defaultPath, + pathParams: String = "", + url: String = "" +): ClusterMetricsInput { + return ClusterMetricsInput(path, pathParams, url) +} + fun randomBucketLevelTriggerRunResult(): BucketLevelTriggerRunResult { val map = mutableMapOf() map.plus(Pair("key1", randomActionRunResult())) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensionsTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensionsTests.kt new file mode 100644 index 000000000..00bf53a4f --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensionsTests.kt @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.test.OpenSearchTestCase + +class SupportedClusterMetricsSettingsExtensionsTests : OpenSearchTestCase() { + private var expectedResponse = hashMapOf() + private var mappedResponse = hashMapOf() + private var supportedJsonPayload = hashMapOf>() + + fun `test redactFieldsFromResponse with non-empty supportedJsonPayload`() { + // GIVEN + mappedResponse = hashMapOf( + ("pathRoot1" to hashMapOf(("pathRoot1_subPath1" to 11), ("pathRoot1_subPath2" to hashMapOf(("pathRoot1_subPath2_subPath1" to 121), ("pathRoot1_subPath2_subPath2" to hashMapOf(("pathRoot1_subPath2_subPath2_subPath1" to 1221))))))), + ("pathRoot2" to hashMapOf(("pathRoot2_subPath1" to 21), ("pathRoot2_subPath2" to setOf(221, 222, "223string")))), + ("pathRoot3" to hashMapOf(("pathRoot3_subPath1" to 31), ("pathRoot3_subPath2" to setOf(321, 322, "323string")))) + ) + + supportedJsonPayload = hashMapOf( + ("pathRoot1" to arrayListOf("pathRoot1_subPath1", "pathRoot1_subPath2.pathRoot1_subPath2_subPath2.pathRoot1_subPath2_subPath2_subPath1")), + ("pathRoot2" to arrayListOf("pathRoot2_subPath2")), + ("pathRoot3" to arrayListOf()) + ) + + expectedResponse = hashMapOf( + ("pathRoot1" to hashMapOf(("pathRoot1_subPath1" to 11), ("pathRoot1_subPath2" to hashMapOf(("pathRoot1_subPath2_subPath2" to hashMapOf(("pathRoot1_subPath2_subPath2_subPath1" to 1221))))))), + ("pathRoot2" to hashMapOf(("pathRoot2_subPath2" to setOf(221, 222, "223string")))), + ("pathRoot3" to hashMapOf(("pathRoot3_subPath1" to 31), ("pathRoot3_subPath2" to setOf(321, 322, "323string")))) + ) + + // WHEN + val result = redactFieldsFromResponse(mappedResponse, supportedJsonPayload) + + // THEN + assertEquals(expectedResponse, result) + } + + fun `test redactFieldsFromResponse with empty supportedJsonPayload`() { + // GIVEN + mappedResponse = hashMapOf( + ("pathRoot1" to hashMapOf(("pathRoot1_subPath1" to 11), ("pathRoot1_subPath2" to hashMapOf(("pathRoot1_subPath2_subPath1" to 121), ("pathRoot1_subPath2_subPath2" to hashMapOf(("pathRoot1_subPath2_subPath2_subPath1" to 1221))))))), + ("pathRoot2" to hashMapOf(("pathRoot2_subPath1" to 21), ("pathRoot2_subPath2" to setOf(221, 222, "223string")))), + ("pathRoot3" to 3) + ) + + expectedResponse = hashMapOf( + ("pathRoot1" to hashMapOf(("pathRoot1_subPath1" to 11), ("pathRoot1_subPath2" to hashMapOf(("pathRoot1_subPath2_subPath1" to 121), ("pathRoot1_subPath2_subPath2" to hashMapOf(("pathRoot1_subPath2_subPath2_subPath1" to 1221))))))), + ("pathRoot2" to hashMapOf(("pathRoot2_subPath1" to 21), ("pathRoot2_subPath2" to setOf(221, 222, "223string")))), + ("pathRoot3" to 3) + ) + + // WHEN + val result = redactFieldsFromResponse(mappedResponse, supportedJsonPayload) + + // THEN + assertEquals(expectedResponse, result) + } +} diff --git a/core/build.gradle b/core/build.gradle index 0cab8b5c6..62713f903 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -16,6 +16,7 @@ dependencies { compile "org.opensearch.client:opensearch-rest-client:${opensearch_version}" compile 'com.google.googlejavaformat:google-java-format:1.10.0' compile "org.opensearch:common-utils:${common_utils_version}" + compile 'commons-validator:commons-validator:1.7' testImplementation "org.opensearch.test:framework:${opensearch_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/ClusterMetricsInput.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/ClusterMetricsInput.kt new file mode 100644 index 000000000..243deb059 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/ClusterMetricsInput.kt @@ -0,0 +1,311 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.core.model + +import org.apache.commons.validator.routines.UrlValidator +import org.apache.http.client.utils.URIBuilder +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import java.io.IOException +import java.net.URI + +val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '#', '>', '<', ' ') + +/** + * This is a data class for a URI type of input for Monitors specifically for local clusters. + */ +data class ClusterMetricsInput( + var path: String, + var pathParams: String = "", + var url: String +) : Input { + val clusterMetricType: ClusterMetricType + val constructedUri: URI + + // Verify parameters are valid during creation + init { + require(validateFields()) { + "The uri.api_type field, uri.path field, or uri.uri field must be defined." + } + + // Create an UrlValidator that only accepts "http" and "https" as valid scheme and allows local URLs. + val urlValidator = UrlValidator(arrayOf("http", "https"), UrlValidator.ALLOW_LOCAL_URLS) + + // Build url field by field if not provided as whole. + constructedUri = toConstructedUri() + + require(urlValidator.isValid(constructedUri.toString())) { + "Invalid URI constructed from the path and path_params inputs, or the url input." + } + + if (url.isNotEmpty() && validateFieldsNotEmpty()) + require(constructedUri == constructUrlFromInputs()) { + "The provided URL and URI fields form different URLs." + } + + require(constructedUri.host.toLowerCase() == SUPPORTED_HOST) { + "Only host '$SUPPORTED_HOST' is supported." + } + require(constructedUri.port == SUPPORTED_PORT) { + "Only port '$SUPPORTED_PORT' is supported." + } + + clusterMetricType = findApiType(constructedUri.path) + this.parseEmptyFields() + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(URI_FIELD) + .field(API_TYPE_FIELD, clusterMetricType) + .field(PATH_FIELD, path) + .field(PATH_PARAMS_FIELD, pathParams) + .field(URL_FIELD, url) + .endObject() + .endObject() + } + + override fun name(): String { + return URI_FIELD + } + + override fun writeTo(out: StreamOutput) { + out.writeString(clusterMetricType.toString()) + out.writeString(path) + out.writeString(pathParams) + out.writeString(url) + } + + companion object { + const val SUPPORTED_SCHEME = "http" + const val SUPPORTED_HOST = "localhost" + const val SUPPORTED_PORT = 9200 + + const val API_TYPE_FIELD = "api_type" + const val PATH_FIELD = "path" + const val PATH_PARAMS_FIELD = "path_params" + const val URL_FIELD = "url" + const val URI_FIELD = "uri" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField("uri"), CheckedFunction { parseInner(it) }) + + /** + * This parse function uses [XContentParser] to parse JSON input and store corresponding fields to create a [ClusterMetricsInput] object + */ + @JvmStatic @Throws(IOException::class) + private fun parseInner(xcp: XContentParser): ClusterMetricsInput { + var path = "" + var pathParams = "" + var url = "" + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + when (fieldName) { + PATH_FIELD -> path = xcp.text() + PATH_PARAMS_FIELD -> pathParams = xcp.text() + URL_FIELD -> url = xcp.text() + } + } + return ClusterMetricsInput(path, pathParams, url) + } + } + + /** + * Constructs the [URI] using either the provided [url], or the + * supported scheme, host, and port and provided [path]+[pathParams]. + * @return The [URI] constructed from [url] if it's defined; + * otherwise a [URI] constructed from the provided [URI] fields. + */ + private fun toConstructedUri(): URI { + return if (url.isEmpty()) { + constructUrlFromInputs() + } else { + URIBuilder(url).build() + } + } + + /** + * Isolates just the path parameters from the [ClusterMetricsInput] URI. + * @return The path parameters portion of the [ClusterMetricsInput] URI. + * @throws IllegalArgumentException if the [ClusterMetricType] requires path parameters, but none are supplied; + * or when path parameters are provided for an [ClusterMetricType] that does not use path parameters. + */ + fun parsePathParams(): String { + val path = this.constructedUri.path + val apiType = this.clusterMetricType + + var pathParams: String + if (this.pathParams.isNotEmpty()) { + pathParams = this.pathParams + } else { + val prependPath = if (apiType.supportsPathParams) apiType.prependPath else apiType.defaultPath + pathParams = path.removePrefix(prependPath) + pathParams = pathParams.removeSuffix(apiType.appendPath) + } + + if (pathParams.isNotEmpty()) { + pathParams = pathParams.trim('/') + ILLEGAL_PATH_PARAMETER_CHARACTERS.forEach { character -> + if (pathParams.contains(character)) + throw IllegalArgumentException("The provided path parameters contain invalid characters or spaces. Please omit: ${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}") + } + } + + if (apiType.requiresPathParams && pathParams.isEmpty()) + throw IllegalArgumentException("The API requires path parameters.") + if (!apiType.supportsPathParams && pathParams.isNotEmpty()) + throw IllegalArgumentException("The API does not use path parameters.") + + return pathParams + } + + /** + * Examines the path of a [ClusterMetricsInput] to determine which API is being called. + * @param uriPath The path to examine. + * @return The [ClusterMetricType] associated with the [ClusterMetricsInput] monitor. + * @throws IllegalArgumentException when the API to call cannot be determined from the URI. + */ + private fun findApiType(uriPath: String): ClusterMetricType { + var apiType = ClusterMetricType.BLANK + ClusterMetricType.values() + .filter { option -> option != ClusterMetricType.BLANK } + .forEach { option -> + if (uriPath.startsWith(option.prependPath) || uriPath.startsWith(option.defaultPath)) + apiType = option + } + if (apiType.isBlank()) + throw IllegalArgumentException("The API could not be determined from the provided URI.") + return apiType + } + + /** + * Constructs a [URI] from the supported scheme, host, and port, and the provided [path], and [pathParams]. + * @return The constructed [URI]. + */ + private fun constructUrlFromInputs(): URI { + val uriBuilder = URIBuilder() + .setScheme(SUPPORTED_SCHEME) + .setHost(SUPPORTED_HOST) + .setPort(SUPPORTED_PORT) + .setPath(path + pathParams) + return uriBuilder.build() + } + + /** + * If [url] field is empty, populates it with [constructedUri]. + * If [path] and [pathParams] are empty, populates them with values from [url]. + */ + private fun parseEmptyFields() { + if (pathParams.isEmpty()) + pathParams = this.parsePathParams() + if (path.isEmpty()) + path = if (pathParams.isEmpty()) clusterMetricType.defaultPath else clusterMetricType.prependPath + if (url.isEmpty()) + url = constructedUri.toString() + } + + /** + * Helper function to confirm at least [url], or required URI component fields are defined. + * @return TRUE if at least either [url] or the other components are provided; otherwise FALSE. + */ + private fun validateFields(): Boolean { + return url.isNotEmpty() || validateFieldsNotEmpty() + } + + /** + * Confirms that required URI component fields are defined. + * Only validating path for now, as that's the only required field. + * @return TRUE if all those fields are defined; otherwise FALSE. + */ + private fun validateFieldsNotEmpty(): Boolean { + return path.isNotEmpty() + } + + /** + * An enum class to quickly reference various supported API. + */ + enum class ClusterMetricType( + val defaultPath: String, + val prependPath: String, + val appendPath: String, + val supportsPathParams: Boolean, + val requiresPathParams: Boolean + ) { + BLANK("", "", "", false, false), + CAT_PENDING_TASKS( + "/_cat/pending_tasks", + "/_cat/pending_tasks", + "", + false, + false + ), + CAT_RECOVERY( + "/_cat/recovery", + "/_cat/recovery", + "", + true, + false + ), + CAT_SNAPSHOTS( + "/_cat/snapshots", + "/_cat/snapshots", + "", + true, + true + ), + CAT_TASKS( + "/_cat/tasks", + "/_cat/tasks", + "", + false, + false + ), + CLUSTER_HEALTH( + "/_cluster/health", + "/_cluster/health", + "", + true, + false + ), + CLUSTER_SETTINGS( + "/_cluster/settings", + "/_cluster/settings", + "", + false, + false + ), + CLUSTER_STATS( + "/_cluster/stats", + "/_cluster/stats", + "", + true, + false + ), + NODES_STATS( + "/_nodes/stats", + "/_nodes", + "", + false, + false + ); + + /** + * @return TRUE if the [ClusterMetricType] is [BLANK]; otherwise FALSE. + */ + fun isBlank(): Boolean { + return this === BLANK + } + } +} diff --git a/core/src/test/kotlin/org/opensearch/alerting/core/model/ClusterMetricsInputTests.kt b/core/src/test/kotlin/org/opensearch/alerting/core/model/ClusterMetricsInputTests.kt new file mode 100644 index 000000000..2fe831c54 --- /dev/null +++ b/core/src/test/kotlin/org/opensearch/alerting/core/model/ClusterMetricsInputTests.kt @@ -0,0 +1,447 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.core.model + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class ClusterMetricsInputTests { + private var path = "/_cluster/health" + private var pathParams = "" + private var url = "" + + @Test + fun `test valid ClusterMetricsInput creation using HTTP URI component fields`() { + // GIVEN + val testUrl = "http://localhost:9200/_cluster/health" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(path, clusterMetricsInput.path) + assertEquals(pathParams, clusterMetricsInput.pathParams) + assertEquals(testUrl, clusterMetricsInput.url) + } + + @Test + fun `test valid ClusterMetricsInput creation using HTTP url field`() { + // GIVEN + path = "" + url = "http://localhost:9200/_cluster/health" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(url, clusterMetricsInput.url) + } + + @Test + fun `test valid ClusterMetricsInput creation using HTTPS url field`() { + // GIVEN + path = "" + url = "https://localhost:9200/_cluster/health" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(url, clusterMetricsInput.url) + } + + @Test + fun `test invalid path`() { + // GIVEN + path = "///" + + // WHEN + THEN + assertFailsWith("Invalid URL.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test invalid url`() { + // GIVEN + url = "///" + + // WHEN + THEN + assertFailsWith("Invalid URL.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test url field and URI component fields create equal URI`() { + // GIVEN + url = "http://localhost:9200/_cluster/health" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(path, clusterMetricsInput.path) + assertEquals(pathParams, clusterMetricsInput.pathParams) + assertEquals(url, clusterMetricsInput.url) + assertEquals(url, clusterMetricsInput.constructedUri.toString()) + } + + @Test + fun `test url field and URI component fields with path params create equal URI`() { + // GIVEN + path = "/_cluster/health/" + pathParams = "index1,index2,index3,index4,index5" + url = "http://localhost:9200/_cluster/health/index1,index2,index3,index4,index5" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(path, clusterMetricsInput.path) + assertEquals(pathParams, clusterMetricsInput.pathParams) + assertEquals(url, clusterMetricsInput.url) + assertEquals(url, clusterMetricsInput.constructedUri.toString()) + } + + @Test + fun `test url field and URI component fields create different URI`() { + // GIVEN + url = "http://localhost:9200/_cluster/stats" + + // WHEN + THEN + assertFailsWith("The provided URL and URI fields form different URLs.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test url field and URI component fields with path params create different URI`() { + // GIVEN + pathParams = "index1,index2,index3,index4,index5" + url = "http://localhost:9200/_cluster/stats/index1,index2,index3,index4,index5" + + // WHEN + THEN + assertFailsWith("The provided URL and URI fields form different URLs.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test ClusterMetricsInput creation when all inputs are empty`() { + // GIVEN + path = "" + pathParams = "" + url = "" + + // WHEN + THEN + assertFailsWith("The uri.api_type field, uri.path field, or uri.uri field must be defined.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test ClusterMetricsInput creation when all inputs but path params are empty`() { + // GIVEN + path = "" + pathParams = "index1,index2,index3,index4,index5" + url = "" + + // WHEN + THEN + assertFailsWith("The uri.api_type field, uri.path field, or uri.uri field must be defined.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test invalid scheme in url field`() { + // GIVEN + path = "" + url = "invalidScheme://localhost:9200/_cluster/health" + + // WHEN + THEN + assertFailsWith("Invalid URL.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test invalid host in url field`() { + // GIVEN + path = "" + url = "http://127.0.0.1:9200/_cluster/health" + + // WHEN + THEN + assertFailsWith("Only host '${ClusterMetricsInput.SUPPORTED_HOST}' is supported.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test invalid port in url field`() { + // GIVEN + path = "" + url = "http://localhost:${ClusterMetricsInput.SUPPORTED_PORT + 1}/_cluster/health" + + // WHEN + THEN + assertFailsWith("Only port '${ClusterMetricsInput.SUPPORTED_PORT}' is supported.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test parsePathParams with no path params`() { + // GIVEN + val testUrl = "http://localhost:9200/_cluster/health" + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // WHEN + val params = clusterMetricsInput.parsePathParams() + + // THEN + assertEquals(pathParams, params) + assertEquals(testUrl, clusterMetricsInput.constructedUri.toString()) + } + + @Test + fun `test parsePathParams with path params as URI field`() { + // GIVEN + path = "/_cluster/health/" + pathParams = "index1,index2,index3,index4,index5" + val testUrl = "http://localhost:9200/_cluster/health/index1,index2,index3,index4,index5" + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // WHEN + val params = clusterMetricsInput.parsePathParams() + + // THEN + assertEquals(pathParams, params) + assertEquals(testUrl, clusterMetricsInput.constructedUri.toString()) + } + + @Test + fun `test parsePathParams with path params in url`() { + // GIVEN + path = "" + val testParams = "index1,index2,index3,index4,index5" + url = "http://localhost:9200/_cluster/health/index1,index2,index3,index4,index5" + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // WHEN + val params = clusterMetricsInput.parsePathParams() + + // THEN + assertEquals(testParams, params) + assertEquals(url, clusterMetricsInput.constructedUri.toString()) + } + + @Test + fun `test parsePathParams with no path params for ApiType that requires path params`() { + // GIVEN + path = "/_cat/snapshots" + + // WHEN + THEN + assertFailsWith("The API requires path parameters.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test parsePathParams with path params for ApiType that doesn't support path params`() { + // GIVEN + path = "/_cluster/settings" + pathParams = "index1,index2,index3,index4,index5" + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // WHEN + THEN + assertFailsWith("The API does not use path parameters.") { + clusterMetricsInput.parsePathParams() + } + } + + @Test + fun `test parsePathParams with path params containing illegal characters`() { + var testCount = 0 // Start off with count of 1 to account for ApiType.BLANK + ILLEGAL_PATH_PARAMETER_CHARACTERS.forEach { character -> + // GIVEN + pathParams = "index1,index2,$character,index4,index5" + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // WHEN + THEN + assertFailsWith( + "The provided path parameters contain invalid characters or spaces. Please omit: ${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}" + ) { + clusterMetricsInput.parsePathParams() + } + testCount++ + } + assertEquals(ILLEGAL_PATH_PARAMETER_CHARACTERS.size, testCount) + } + + @Test + fun `test ClusterMetricsInput correctly determines ApiType when path is provided as URI component`() { + var testCount = 1 // Start off with count of 1 to account for ApiType.BLANK + ClusterMetricsInput.ClusterMetricType.values() + .filter { enum -> enum != ClusterMetricsInput.ClusterMetricType.BLANK } + .forEach { testApiType -> + // GIVEN + path = testApiType.defaultPath + pathParams = if (testApiType.supportsPathParams) "index1,index2,index3,index4,index5" else "" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(testApiType, clusterMetricsInput.clusterMetricType) + testCount++ + } + assertEquals(ClusterMetricsInput.ClusterMetricType.values().size, testCount) + } + + @Test + fun `test ClusterMetricsInput correctly determines ApiType when path and path params are provided as URI components`() { + var testCount = 1 // Start off with count of 1 to account for ApiType.BLANK + ClusterMetricsInput.ClusterMetricType.values() + .filter { enum -> enum != ClusterMetricsInput.ClusterMetricType.BLANK } + .forEach { testApiType -> + // GIVEN + path = testApiType.defaultPath + pathParams = "index1,index2,index3,index4,index5" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(testApiType, clusterMetricsInput.clusterMetricType) + testCount++ + } + assertEquals(ClusterMetricsInput.ClusterMetricType.values().size, testCount) + } + + @Test + fun `test ClusterMetricsInput correctly determines ApiType when path is provided in URL field`() { + var testCount = 1 // Start off with count of 1 to account for ApiType.BLANK + ClusterMetricsInput.ClusterMetricType.values() + .filter { enum -> enum != ClusterMetricsInput.ClusterMetricType.BLANK } + .forEach { testApiType -> + // GIVEN + path = "" + pathParams = if (testApiType.supportsPathParams) "index1,index2,index3,index4,index5" else "" + url = "http://localhost:9200${testApiType.defaultPath}" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(testApiType, clusterMetricsInput.clusterMetricType) + testCount++ + } + assertEquals(ClusterMetricsInput.ClusterMetricType.values().size, testCount) + } + + @Test + fun `test ClusterMetricsInput correctly determines ApiType when path and path params are provided in URL field`() { + var testCount = 1 // Start off with count of 1 to account for ApiType.BLANK + ClusterMetricsInput.ClusterMetricType.values() + .filter { enum -> enum != ClusterMetricsInput.ClusterMetricType.BLANK } + .forEach { testApiType -> + // GIVEN + path = "" + pathParams = if (testApiType.supportsPathParams) "/index1,index2,index3,index4,index5" else "" + url = "http://localhost:9200${testApiType.defaultPath}$pathParams" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(testApiType, clusterMetricsInput.clusterMetricType) + testCount++ + } + assertEquals(ClusterMetricsInput.ClusterMetricType.values().size, testCount) + } + + @Test + fun `test ClusterMetricsInput cannot determine ApiType when invalid path is provided as URI component`() { + // GIVEN + path = "/_cat/paws" + + // WHEN + THEN + assertFailsWith("The API could not be determined from the provided URI.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test ClusterMetricsInput cannot determine ApiType when invalid path and path params are provided as URI components`() { + // GIVEN + path = "/_cat/paws" + pathParams = "index1,index2,index3,index4,index5" + + // WHEN + THEN + assertFailsWith("The API could not be determined from the provided URI.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test ClusterMetricsInput cannot determine ApiType when invaid path is provided in URL`() { + // GIVEN + path = "" + url = "http://localhost:9200/_cat/paws" + + // WHEN + THEN + assertFailsWith("The API could not be determined from the provided URI.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test ClusterMetricsInput cannot determine ApiType when invaid path and path params are provided in URL`() { + // GIVEN + path = "" + url = "http://localhost:9200/_cat/paws/index1,index2,index3,index4,index5" + + // WHEN + THEN + assertFailsWith("The API could not be determined from the provided URI.") { + ClusterMetricsInput(path, pathParams, url) + } + } + + @Test + fun `test parseEmptyFields populates empty path and path_params when url is provided`() { + // GIVEN + path = "" + pathParams = "" + val testPath = "/_cluster/health" + val testPathParams = "index1,index2,index3,index4,index5" + url = "http://localhost:9200$testPath$testPathParams" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(testPath, clusterMetricsInput.path) + assertEquals(testPathParams, clusterMetricsInput.pathParams) + assertEquals(url, clusterMetricsInput.url) + } + + @Test + fun `test parseEmptyFields populates empty url field when path and path_params are provided`() { + // GIVEN + path = "/_cluster/health/" + pathParams = "index1,index2,index3,index4,index5" + val testUrl = "http://localhost:9200$path$pathParams" + + // WHEN + val clusterMetricsInput = ClusterMetricsInput(path, pathParams, url) + + // THEN + assertEquals(path, clusterMetricsInput.path) + assertEquals(pathParams, clusterMetricsInput.pathParams) + assertEquals(testUrl, clusterMetricsInput.url) + } +} diff --git a/release-notes/opensearch-alerting.release-notes-1.3.0.0.md b/release-notes/opensearch-alerting.release-notes-1.3.0.0.md new file mode 100644 index 000000000..ec8635dc6 --- /dev/null +++ b/release-notes/opensearch-alerting.release-notes-1.3.0.0.md @@ -0,0 +1,18 @@ +## Version 1.3.0.0 2022-03-09 + +Compatible with OpenSearch 1.3.0 + +### Enhancements +* Implemented support for ClusterMetrics monitors. ([#221](https://github.com/opensearch-project/alerting/pull/221)) + +### Maintenance +* Bumps to version 1.3. ([#248](https://github.com/opensearch-project/alerting/pull/248)) +* Update GitHub Actions to run on all branches. ([#256](https://github.com/opensearch-project/alerting/pull/256)) +* Added support for JDK 8 and 14. ([#335](https://github.com/opensearch-project/alerting/pull/335)) + +### Bug Fixes +* Fix running Alerting security tests in GitHub Actions. ([#252](https://github.com/opensearch-project/alerting/pull/252)) + +### Documentation +* Added 1.3 release notes. ([#336](https://github.com/opensearch-project/alerting/pull/336)) +* Updated DEVELOPER_GUIDE.md to reference changes to the supported JDKs. ([#338](https://github.com/opensearch-project/alerting/pull/338)) \ No newline at end of file