Skip to content

Commit

Permalink
Added layer for creating and updating the workflow (opensearch-projec…
Browse files Browse the repository at this point in the history
…t#831)

* Renamed chainedFindings to chainedMonitorFindings

* Removed unecessary mappings from workflow definition

* Improved logging when saving the workflows

* Added a workflow id in response

* Added role check and index access once the workflow is being created

* Updated mappings for the workflow

---------

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored and eirsep committed May 24, 2023
1 parent 841ac90 commit 051a819
Show file tree
Hide file tree
Showing 9 changed files with 1,807 additions and 13 deletions.
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ dependencies {
testImplementation "org.mockito:mockito-core:${versions.mockito}"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:lang-painless:${opensearch_version}"
testImplementation "org.opensearch.plugin:lang-mustache-client:${opensearch_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
Expand Down Expand Up @@ -80,6 +81,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
Expand Down Expand Up @@ -119,7 +121,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")

@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"

@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"

@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
Expand Down Expand Up @@ -191,8 +193,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java)

ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java)
)
}

Expand All @@ -204,7 +206,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
DocumentLevelTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ fun Destination.isTestAction(): Boolean = this.type == DestinationType.TEST_ACTI

fun Monitor.isDocLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR

fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.QUERY_LEVEL_MONITOR

/**
* Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used
* as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values.
Expand Down
65 changes: 65 additions & 0 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.ChainedMonitorFindings
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Delegate
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.DocumentLevelTrigger
Expand All @@ -47,7 +50,10 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.Schedule
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Sequence
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.Workflow.WorkflowType
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
Expand Down Expand Up @@ -218,6 +224,61 @@ fun randomDocumentLevelMonitor(
)
}

fun randomWorkflow(
id: String = Workflow.NO_ID,
monitorIds: List<String>,
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = randomBoolean(),
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS)
): Workflow {
val delegates = mutableListOf<Delegate>()
if (!monitorIds.isNullOrEmpty()) {
delegates.add(Delegate(1, monitorIds[0]))
for (i in 1 until monitorIds.size) {
// Order of monitors in workflow will be the same like forwarded meaning that the first monitorId will be used as second monitor chained finding
delegates.add(Delegate(i + 1, monitorIds [i], ChainedMonitorFindings(monitorIds[i - 1])))
}
}

return Workflow(
id = id,
name = name,
enabled = enabled,
schedule = schedule,
lastUpdateTime = lastUpdateTime,
enabledTime = enabledTime,
workflowType = WorkflowType.COMPOSITE,
user = user,
inputs = listOf(CompositeInput(Sequence(delegates)))
)
}

fun randomWorkflowWithDelegates(
id: String = Workflow.NO_ID,
delegates: List<Delegate>,
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = randomBoolean(),
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
): Workflow {
return Workflow(
id = id,
name = name,
enabled = enabled,
schedule = schedule,
lastUpdateTime = lastUpdateTime,
enabledTime = enabledTime,
workflowType = WorkflowType.COMPOSITE,
user = user,
inputs = listOf(CompositeInput(Sequence(delegates)))
)
}

fun randomQueryLevelTrigger(
id: String = UUIDs.base64UUID(),
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
Expand Down Expand Up @@ -699,3 +760,7 @@ fun assertUserNull(map: Map<String, Any?>) {
fun assertUserNull(monitor: Monitor) {
assertNull("User is not null", monitor.user)
}

fun assertUserNull(workflow: Workflow) {
assertNull("User is not null", workflow.user)
}
Loading

0 comments on commit 051a819

Please sign in to comment.