Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.8] Adds transport layer actions for CRUD workflows (#934) #941

Merged
merged 1 commit into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ dependencies {
testImplementation "org.mockito:mockito-core:${versions.mockito}"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:lang-painless:${opensearch_version}"
testImplementation "org.opensearch.plugin:lang-mustache-client:${opensearch_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
44 changes: 41 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
Expand Down Expand Up @@ -38,25 +39,31 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.alerting.workflow.WorkflowRunnerService
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
Expand All @@ -78,6 +85,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.env.Environment
Expand Down Expand Up @@ -140,6 +148,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
}

lateinit var runner: MonitorRunnerService
lateinit var workflowRunner: WorkflowRunnerService
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
Expand Down Expand Up @@ -191,8 +200,11 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java)

ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
)
}

Expand All @@ -204,7 +216,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
DocumentLevelTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}

Expand Down Expand Up @@ -239,6 +252,22 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerConsumers()
.registerDestinationSettings()
workflowRunner = WorkflowRunnerService
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerIndexNameExpressionResolver(indexNameExpressionResolver)
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry))
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
Expand All @@ -254,6 +283,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
settings
)

WorkflowMetadataService.initialize(
client,
clusterService,
xContentRegistry,
settings
)

DeleteMonitorService.initialize(client)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -59,7 +60,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?
): MonitorRunResult<BucketLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")
Expand Down Expand Up @@ -118,7 +120,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitor,
periodStart,
periodEnd,
monitorResult.inputResults
monitorResult.inputResults,
workflowRunContext
)
if (firstIteration) {
firstPageOfInputResults = inputResults
Expand Down Expand Up @@ -154,7 +157,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
!dryrun && monitor.id != Monitor.NO_ID,
workflowRunContext
)
} else {
emptyList()
Expand Down Expand Up @@ -350,7 +354,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowRunContext: WorkflowRunContext? = null
): List<String> {
monitor.inputs.forEach { input ->
if (input is SearchInput) {
Expand All @@ -361,14 +366,14 @@ object BucketLevelMonitorRunner : MonitorRunner() {
for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) {
when (aggFactory) {
is CompositeAggregationBuilder -> {
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
var groupByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
val sources = aggFactory.sources()
for (source in sources) {
if (grouByFields > 0) {
if (groupByFields > 0) {
logger.error("grouByFields > 0. not generating findings for bucket level monitor ${monitor.id}")
return listOf()
}
grouByFields++
groupByFields++
fieldName = source.field()
}
}
Expand Down Expand Up @@ -409,7 +414,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
sr.source().query(queryBuilder)
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding)
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, workflowRunContext?.executionId)
} else {
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
}
Expand All @@ -422,7 +427,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
searchResponse: SearchResponse,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null
): List<String> {
val docIdsByIndexName: MutableMap<String, MutableList<String>> = mutableMapOf()
for (hit in searchResponse.hits.hits) {
Expand All @@ -441,7 +447,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorName = monitor.name,
index = it.key,
timestamp = Instant.now(),
docLevelQueries = listOf()
docLevelQueries = listOf(),
executionId = workflowExecutionId
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
Expand Down Expand Up @@ -70,7 +71,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
Expand All @@ -96,7 +98,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = false,
skipIndex = isTempMonitor
skipIndex = isTempMonitor,
workflowRunContext?.workflowMetadataId
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
Expand Down Expand Up @@ -135,6 +138,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
Expand Down Expand Up @@ -169,7 +175,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)
val matchingDocs = getMatchingDocs(
monitor,
monitorCtx,
docExecutionContext,
indexName,
matchingDocIdsPerIndex?.get(indexName)
)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
Expand Down Expand Up @@ -226,7 +238,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun
dryrun,
workflowRunContext?.executionId
)
}
}
Expand Down Expand Up @@ -298,7 +311,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean
dryrun: Boolean,
workflowExecutionId: String? = null
): DocumentLevelTriggerRunResult {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)
Expand All @@ -309,7 +323,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// TODO: Implement throttling for findings
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key, !dryrun && monitor.id != Monitor.NO_ID)
val findingId = createFindings(
monitor,
monitorCtx,
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
workflowExecutionId
)
findings.add(findingId)

if (triggerResult.triggeredDocs.contains(it.key)) {
Expand Down Expand Up @@ -379,7 +400,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
docLevelQueries: List<DocLevelQuery>,
matchingDocId: String,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null,
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")
Expand All @@ -392,7 +414,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now()
timestamp = Instant.now(),
executionId = workflowExecutionId
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
Expand Down Expand Up @@ -514,7 +537,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String
index: String,
docIds: List<String>? = null
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
Expand All @@ -530,7 +554,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
shard,
prevSeqNo,
maxSeqNo,
null
null,
docIds
)

if (hits.hits.isNotEmpty()) {
Expand All @@ -549,7 +574,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
shard: String,
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?
query: String?,
docIds: List<String>? = null
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand All @@ -561,6 +587,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
}

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}

val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
Expand Down
Loading