Skip to content

Commit

Permalink
[BUG] ExecuteMonitor inserting metadata doc during dry run (opensearc…
Browse files Browse the repository at this point in the history
…h-project#758) (opensearch-project#777)

* execute monitor bugfix

Signed-off-by: Petar Dzepina <[email protected]>

* added IT

Signed-off-by: Petar Dzepina <[email protected]>

* fixed created retval when skipIndex=true

Signed-off-by: Petar Dzepina <[email protected]>

---------

Signed-off-by: Petar Dzepina <[email protected]>
(cherry picked from commit ce7094a925333bacebd337b82322b19ec119210b)

Co-authored-by: Petar Dzepina <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and petardz authored Feb 3, 2023
1 parent a69d762 commit aeb2f2d
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
dryrun: Boolean
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)

try {
Expand All @@ -84,13 +85,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor, createWithRunContext = false)
var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = false,
skipIndex = isTempMonitor
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,23 @@ object MonitorMetadataService :
}
}

suspend fun getOrCreateMetadata(monitor: Monitor, createWithRunContext: Boolean = true): Pair<MonitorMetadata, Boolean> {
suspend fun getOrCreateMetadata(
monitor: Monitor,
createWithRunContext: Boolean = true,
skipIndex: Boolean = false
): Pair<MonitorMetadata, Boolean> {
try {
val created = true
val metadata = getMetadata(monitor)
return if (metadata != null) {
metadata to !created
} else {
val newMetadata = createNewMetadata(monitor, createWithRunContext = createWithRunContext)
upsertMetadata(newMetadata, updating = false) to created
if (skipIndex) {
newMetadata to created
} else {
upsertMetadata(newMetadata, updating = false) to created
}
}
} catch (e: Exception) {
throw AlertingException.wrap(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class TransportExecuteMonitorAction @Inject constructor(
docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources)
log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created")
}
val (metadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor)
val (metadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor, skipIndex = true)
docLevelMonitorQueries.indexDocLevelQueries(
monitor,
monitor.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
}
}

refreshAllIndices()

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 2, alerts.size)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
Expand Down Expand Up @@ -342,6 +343,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
}

fun `test execute monitor without create when no monitors exists`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customQueryIndex = "custom_alerts_index"
val analyzer = "whitespace"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
)
)
var executeMonitorResponse = executeMonitor(monitor, null)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

assertIndexNotExists(SCHEDULED_JOBS_INDEX)

val createMonitorResponse = createMonitor(monitor)

assertIndexExists(SCHEDULED_JOBS_INDEX)

indexDoc(index, "1", testDoc)

executeMonitorResponse = executeMonitor(monitor, createMonitorResponse?.id, dryRun = false)

Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
Assert.assertEquals(
(executeMonitorResponse.monitorRunResult.triggerResults.iterator().next().value as DocumentLevelTriggerRunResult)
.triggeredDocs.size,
1
)
}

fun `test execute monitor with custom query index and custom field mappings`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() {
return getIndexResponse.indices().toList()
}

protected fun executeMonitor(monitor: Monitor, id: String, dryRun: Boolean = true): ExecuteMonitorResponse? {
protected fun executeMonitor(monitor: Monitor, id: String?, dryRun: Boolean = true): ExecuteMonitorResponse? {
val request = ExecuteMonitorRequest(dryRun, TimeValue(Instant.now().toEpochMilli()), id, monitor)
return client().execute(ExecuteMonitorAction.INSTANCE, request).get()
}
Expand Down

0 comments on commit aeb2f2d

Please sign in to comment.