Skip to content

Commit

Permalink
optimize to fetch only fields relevant to doc level queries in doc le…
Browse files Browse the repository at this point in the history
…vel monitor

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 22, 2024
1 parent f643454 commit 0dca765
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT,
LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE,
LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED,
DestinationSettings.EMAIL_USERNAME,
DestinationSettings.EMAIL_PASSWORD,
DestinationSettings.ALLOW_LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,28 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}
}

val fieldsToBeQueried = mutableSetOf<String>()

for (it in queries) {
if (it.queryFieldNames.isEmpty()) {
fieldsToBeQueried.clear()
logger.debug(
"Monitor ${monitor.id} : " +
"Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " +
"Cannot optimize monitor to fetch only query-relevant fields. " +
"Querying entire doc source."
)
break
}
fieldsToBeQueried.addAll(it.queryFieldNames)
}
if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToBeQueried.isNotEmpty()) {
logger.debug(
"Monitor ${monitor.id} Querying only fields " +
"${fieldsToBeQueried.joinToString()} instead of entire _source of documents"
)
}

// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

Expand All @@ -252,6 +274,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries,
updatedIndexNames,
concreteIndicesSeenSoFar,
ArrayList(fieldsToBeQueried)
)
}
}
Expand Down Expand Up @@ -683,6 +706,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries: MutableMap<String, MutableList<String>>,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
fieldsToBeQueried: List<String>,
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand All @@ -697,8 +721,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shard,
prevSeqNo,
maxSeqNo,
null,
docIds
docIds,
fieldsToBeQueried
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
Expand Down Expand Up @@ -789,19 +813,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shard: String,
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null,
fieldsToFetch: List<String>,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
}
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (query != null) {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
}

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
Expand All @@ -816,6 +836,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.size(10000)
)
.preference(Preference.PRIMARY_FIRST.type())

if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) {
request.source().fetchSource(false)
for (field in fieldsToFetch) {
request.source().fetchField(field)
}
}
val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
logger.error("Failed search shard. Response: $response")
Expand Down Expand Up @@ -906,7 +933,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
): List<Pair<String, TransformedDocDto>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, TransformedDocDto>? {
try {
val sourceMap = hit.sourceAsMap
val sourceMap = if (hit.hasSource()) {
hit.sourceAsMap
} else {
constructSourceMapFromFieldsInHit(hit)
}
transformDocumentFieldNames(
sourceMap,
conflictingFields,
Expand All @@ -927,6 +958,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
})
}

private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap<String, Any> {
if (hit.fields == null)
return mutableMapOf()
val sourceMap: MutableMap<String, Any> = mutableMapOf()
for (field in hit.fields) {
if (field.value.values != null && field.value.values.isNotEmpty())
if (field.value.values.size == 1) {
sourceMap[field.key] = field.value.values[0]
} else sourceMap[field.key] = field.value.values
}
return sourceMap
}

/**
* Traverses document fields in leaves recursively and appends [fieldNameSuffixIndex] to field names with same names
* but different mappings & [fieldNameSuffixPattern] to field names which have unique names.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ data class MonitorRunnerExecutionContext(

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT
Expand Down Expand Up @@ -182,6 +183,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.findingsIndexBatchSize = it
}

monitorCtx.fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED) {
monitorCtx.fetchOnlyQueryFieldNames = it
}

return this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,17 @@ class AlertingSettings {
*/
val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_max_num_docs_in_memory",
300000, 1000,
50000, 1000,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/**
* Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries.
* Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards.
*/
val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting(
"plugins.alerting.monitor.doc_level_monitor_query_field_names_enabled",
true,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.hc.core5.http.io.entity.StringEntity
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
import org.opensearch.common.xcontent.json.JsonXContent
Expand Down Expand Up @@ -75,6 +76,152 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test dryrun execute monitor with queryFieldNames set up with correct field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()

val docQuery =
DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf(), queryFieldNames = listOf("test_field"))
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(1, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() {

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()
// using wrong field name
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("wrong_field")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(0, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 0, alerts.size)
}

fun `test fetch_query_field_names setting is disabled by configuring queryFieldNames set up with wrong field still works`() {
adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.key, "false")
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val index = createTestIndex()
// using wrong field name
val docQuery = DocLevelQuery(
query = "test_field:\"us-west-2\"",
name = "3",
fields = listOf(),
queryFieldNames = listOf("wrong_field")
)
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)

indexDoc(index, "1", testDoc)

val response = executeMonitor(monitor, params = DRYRUN_MONITOR)

val output = entityAsMap(response)
assertEquals(monitor.name, output["monitor_name"])

assertEquals(1, output.objectMap("trigger_results").values.size)

for (triggerResult in output.objectMap("trigger_results").values) {
assertEquals(1, triggerResult.objectMap("action_results").values.size)
for (alertActionResult in triggerResult.objectMap("action_results").values) {
for (actionResult in alertActionResult.values) {
@Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map<String, Map<String, String>>)["output"]
as Map<String, String>
assertEquals("Hello ${monitor.name}", actionOutput["subject"])
assertEquals("Hello ${monitor.name}", actionOutput["message"])
}
}
}

val alerts = searchAlerts(monitor)
assertEquals("Alert saved for test monitor", 1, alerts.size)
}

fun `test execute monitor returns search result with dryrun`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Loading

0 comments on commit 0dca765

Please sign in to comment.