Skip to content

Commit

Permalink
Fix few ktlint failures.
Browse files Browse the repository at this point in the history
Signed-off-by: Saurabh Singh <[email protected]>
  • Loading branch information
getsaurabh02 committed Mar 25, 2022
1 parent a2f1e87 commit 28ba8e6
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.elasticapi.InjectorContextElement
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.model.action.PerExecutionActionScope
Expand All @@ -27,11 +27,11 @@ import org.opensearch.alerting.util.isAllowed
import org.opensearch.common.Strings
import java.time.Instant

object BucketLevelMonitorRunner: MonitorRunner {
object BucketLevelMonitorRunner : MonitorRunner {
private val logger = LogManager.getLogger(javaClass)

override suspend fun runMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, dryrun: Boolean):
MonitorRunResult<BucketLevelTriggerRunResult> {
MonitorRunResult<BucketLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")

Expand Down Expand Up @@ -112,7 +112,7 @@ object BucketLevelMonitorRunner: MonitorRunner {
// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList()
monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList()
).toMutableMap()
val dedupedAlerts = categorizedAlerts.getOrDefault(AlertCategory.DEDUPED, emptyList())
var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList())
Expand All @@ -134,9 +134,9 @@ object BucketLevelMonitorRunner: MonitorRunner {
// Store deduped and new Alerts to accumulate across pages
if (!nextAlerts.containsKey(trigger.id)) {
nextAlerts[trigger.id] = mutableMapOf(
AlertCategory.DEDUPED to mutableListOf(),
AlertCategory.NEW to mutableListOf(),
AlertCategory.COMPLETED to mutableListOf()
AlertCategory.DEDUPED to mutableListOf(),
AlertCategory.NEW to mutableListOf(),
AlertCategory.COMPLETED to mutableListOf()
)
}
nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)?.addAll(dedupedAlerts)
Expand All @@ -158,8 +158,8 @@ object BucketLevelMonitorRunner: MonitorRunner {
// Filter ACKNOWLEDGED Alerts from the deduped list so they do not have Actions executed for them.
// New Alerts are ignored since they cannot be acknowledged yet.
val dedupedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.DEDUPED)
?.filterNot { it.state == Alert.State.ACKNOWLEDGED }?.toMutableList()
?: mutableListOf()
?.filterNot { it.state == Alert.State.ACKNOWLEDGED }?.toMutableList()
?: mutableListOf()
// Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution
nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts)
val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
Expand All @@ -174,11 +174,11 @@ object BucketLevelMonitorRunner: MonitorRunner {
val triggerResult = triggerResults[trigger.id]!!
val monitorOrTriggerError = monitorResult.error ?: triggerResult.error
val shouldDefaultToPerExecution = defaultToPerExecutionAction(
monitorCtx,
monitorId = monitor.id,
triggerId = trigger.id,
totalActionableAlertCount = dedupedAlerts.size + newAlerts.size + completedAlerts.size,
monitorOrTriggerError = monitorOrTriggerError
monitorCtx,
monitorId = monitor.id,
triggerId = trigger.id,
totalActionableAlertCount = dedupedAlerts.size + newAlerts.size + completedAlerts.size,
monitorOrTriggerError = monitorOrTriggerError
)
for (action in trigger.actions) {
// ActionExecutionPolicy should not be null for Bucket-Level Monitors since it has a default config when not set explicitly
Expand All @@ -188,7 +188,7 @@ object BucketLevelMonitorRunner: MonitorRunner {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
val actionCtx = getActionContextForAlertCategory(
alertCategory, alert, triggerCtx, monitorOrTriggerError
alertCategory, alert, triggerCtx, monitorOrTriggerError
)
// AggregationResultBucket should not be null here
val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
Expand Down Expand Up @@ -218,10 +218,10 @@ object BucketLevelMonitorRunner: MonitorRunner {
continue

val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
newAlerts = newAlerts,
completedAlerts = completedAlerts,
error = monitorResult.error ?: triggerResult.error
dedupedAlerts = dedupedAlerts,
newAlerts = newAlerts,
completedAlerts = completedAlerts,
error = monitorResult.error ?: triggerResult.error
)
val actionResult = this.runAction(action, actionCtx, monitorCtx, dryrun)
// If there was an error during trigger execution then the Alerts to be updated are the current Alerts since the state
Expand Down Expand Up @@ -250,10 +250,10 @@ object BucketLevelMonitorRunner: MonitorRunner {
val bucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
val actionResults = triggerResult.actionResultsMap.getOrDefault(bucketKeysHash, emptyMap<String, ActionRunResult>())
monitorCtx.alertService!!.updateActionResultsForBucketLevelAlert(
alert.copy(lastNotificationTime = MonitorRunnerService.currentTime()),
actionResults,
// TODO: Update BucketLevelTriggerRunResult.alertError() to retrieve error based on the first failed Action
monitorResult.alertError() ?: triggerResult.alertError()
alert.copy(lastNotificationTime = MonitorRunnerService.currentTime()),
actionResults,
// TODO: Update BucketLevelTriggerRunResult.alertError() to retrieve error based on the first failed Action
monitorResult.alertError() ?: triggerResult.alertError()
)
}

Expand Down Expand Up @@ -286,10 +286,10 @@ object BucketLevelMonitorRunner: MonitorRunner {

val destinationCtx = monitorCtx.destinationContextFactory!!.getDestinationContext(destination)
actionOutput[Action.MESSAGE_ID] = destination.publish(
actionOutput[Action.SUBJECT],
actionOutput[Action.MESSAGE]!!,
destinationCtx,
monitorCtx.hostDenyList
actionOutput[Action.SUBJECT],
actionOutput[Action.MESSAGE]!!,
destinationCtx,
monitorCtx.hostDenyList
)
}
}
Expand All @@ -300,17 +300,17 @@ object BucketLevelMonitorRunner: MonitorRunner {
}

private fun defaultToPerExecutionAction(
monitorCtx: MonitorRunnerExecutionContext,
monitorId: String,
triggerId: String,
totalActionableAlertCount: Int,
monitorOrTriggerError: Exception?
monitorCtx: MonitorRunnerExecutionContext,
monitorId: String,
triggerId: String,
totalActionableAlertCount: Int,
monitorOrTriggerError: Exception?
): Boolean {
// If the monitorId or triggerResult has an error, then also default to PER_EXECUTION to communicate the error
if (monitorOrTriggerError != null) {
logger.debug(
"Trigger [$triggerId] in monitor [$monitorId] encountered an error. Defaulting to " +
"[${ActionExecutionScope.Type.PER_EXECUTION}] for action execution to communicate error."
"Trigger [$triggerId] in monitor [$monitorId] encountered an error. Defaulting to " +
"[${ActionExecutionScope.Type.PER_EXECUTION}] for action execution to communicate error."
)
return true
}
Expand All @@ -322,9 +322,9 @@ object BucketLevelMonitorRunner: MonitorRunner {
// PER_EXECUTION for less intrusive Actions
if (totalActionableAlertCount > monitorCtx.maxActionableAlertCount) {
logger.debug(
"The total actionable alerts for trigger [$triggerId] in monitor [$monitorId] is [$totalActionableAlertCount] " +
"which exceeds the maximum of [$(monitorCtx.maxActionableAlertCount)]. Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] " +
"for action execution."
"The total actionable alerts for trigger [$triggerId] in monitor [$monitorId] is [$totalActionableAlertCount] " +
"which exceeds the maximum of [$(monitorCtx.maxActionableAlertCount)]. Defaulting to [${ActionExecutionScope.Type.PER_EXECUTION}] " +
"for action execution."
)
return true
}
Expand All @@ -333,10 +333,10 @@ object BucketLevelMonitorRunner: MonitorRunner {
}

private fun getActionContextForAlertCategory(
alertCategory: AlertCategory,
alert: Alert,
ctx: BucketLevelTriggerExecutionContext,
error: Exception?
alertCategory: AlertCategory,
alert: Alert,
ctx: BucketLevelTriggerExecutionContext,
error: Exception?
): BucketLevelTriggerExecutionContext {
return when (alertCategory) {
AlertCategory.DEDUPED ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.elasticapi.string
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.Monitor
Expand All @@ -34,11 +34,11 @@ import java.time.Instant
import java.util.UUID
import kotlin.collections.HashMap

object DocumentReturningMonitorRunner: MonitorRunner {
object DocumentReturningMonitorRunner : MonitorRunner {
private val logger = LogManager.getLogger(javaClass)

override suspend fun runMonitor(monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, dryrun: Boolean):
MonitorRunResult<DocumentLevelTriggerRunResult> {
MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.info("Document-level-monitor is running ...")
val monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
try {
Expand Down Expand Up @@ -105,13 +105,13 @@ object DocumentReturningMonitorRunner: MonitorRunner {
}

private fun runForEachDocTrigger(
monitorCtx: MonitorRunnerExecutionContext,
trigger: DocumentLevelTrigger,
monitor: Monitor,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean
monitorCtx: MonitorRunnerExecutionContext,
trigger: DocumentLevelTrigger,
monitor: Monitor,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean
) {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds)
Expand All @@ -133,25 +133,25 @@ object DocumentReturningMonitorRunner: MonitorRunner {
}

fun createFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
docLevelQuery: DocLevelQuery,
matchingDocIds: Set<String>,
trigger: DocumentLevelTrigger
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
docLevelQuery: DocLevelQuery,
matchingDocIds: Set<String>,
trigger: DocumentLevelTrigger
): String {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = matchingDocIds.joinToString(","),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = trigger.id,
triggerName = trigger.name
id = UUID.randomUUID().toString(),
relatedDocId = matchingDocIds.joinToString(","),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = trigger.id,
triggerName = trigger.name
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
Expand All @@ -160,8 +160,8 @@ object DocumentReturningMonitorRunner: MonitorRunner {

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)

monitorCtx.client!!.index(indexRequest).actionGet()
return finding.id
Expand Down Expand Up @@ -207,16 +207,16 @@ object DocumentReturningMonitorRunner: MonitorRunner {
*/
private fun getMaxSeqNo(monitorCtx: MonitorRunnerExecutionContext, index: String, shard: String): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(QueryBuilders.matchAllQuery())
.size(1)
)
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.sort("_seq_no", SortOrder.DESC)
.seqNoAndPrimaryTerm(true)
.query(QueryBuilders.matchAllQuery())
.size(1)
)
val response: SearchResponse = monitorCtx.client!!.search(request).actionGet()
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
Expand All @@ -233,10 +233,10 @@ object DocumentReturningMonitorRunner: MonitorRunner {
}

private fun runForEachQuery(
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
query: DocLevelQuery,
index: String
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
query: DocLevelQuery,
index: String
): Set<String> {
val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int
val matchingDocs = mutableSetOf<String>()
Expand All @@ -249,12 +249,12 @@ object DocumentReturningMonitorRunner: MonitorRunner {
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo")

val hits: SearchHits = searchShard(
monitorCtx,
index,
shard,
docExecutionCtx.lastRunContext[shard].toString().toLongOrNull(),
maxSeqNo,
query.query
monitorCtx,
index,
shard,
docExecutionCtx.lastRunContext[shard].toString().toLongOrNull(),
maxSeqNo,
query.query
)
logger.info("Search hits for shard_$shard is: ${hits.hits.size}")

Expand All @@ -279,14 +279,14 @@ object DocumentReturningMonitorRunner: MonitorRunner {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))

val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
)
.indices(index)
.preference("_shards:$shard")
.source(
SearchSourceBuilder()
.version(true)
.query(boolQueryBuilder)
.size(10000) // fixme: make this configurable.
)
logger.info("Request: $request")
val response: SearchResponse = monitorCtx.client!!.search(request).actionGet()
if (response.status() !== RestStatus.OK) {
Expand Down
Loading

0 comments on commit 28ba8e6

Please sign in to comment.