Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryIndex rollover when field mapping limit is reached #725

Merged
merged 19 commits into from
Dec 31, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -134,6 +136,7 @@ object MonitorMetadataService :
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
MonitorMetadata.parse(xcp)
} else {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ data class MonitorMetadata(
var lastRunContext: Map<String, Any> = mapOf()
var sourceToQueryIndexMapping: MutableMap<String, String> = mutableMapOf()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,11 @@
package org.opensearch.alerting.util

import org.apache.logging.log4j.LogManager
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.client.Client
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
Expand Down Expand Up @@ -122,23 +111,3 @@ fun defaultToPerExecutionAction(

return false
}

suspend fun updateMonitorMetadata(client: Client, settings: Settings, monitorMetadata: MonitorMetadata): IndexResponse {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(monitorMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.id(monitorMetadata.id)
.timeout(AlertingSettings.INDEX_TIMEOUT.get(settings))

return client.suspendUntil { client.index(indexRequest, it) }
}

suspend fun updateMonitor(client: Client, settings: Settings, monitor: Monitor): IndexResponse {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(monitor.toXContentWithUser(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.id(monitor.id)
.timeout(AlertingSettings.INDEX_TIMEOUT.get(settings))

return client.suspendUntil { client.index(indexRequest, it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
Expand Down Expand Up @@ -42,7 +43,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
const val PROPERTIES = "properties"
const val NESTED = "nested"
const val TYPE = "type"
const val INDEX_PATTERN_SUFIX = "-000001"
const val INDEX_PATTERN_SUFFIX = "-000001"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the number getting incremented during rollovers? If so, where?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is done by _rollover core's API here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

@JvmStatic
fun docLevelQueriesMappings(): String {
return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText()
Expand All @@ -51,8 +52,20 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ

suspend fun initDocLevelQueryIndex(): Boolean {
if (!docLevelQueryIndexExists()) {
// Since we changed queryIndex to be alias now, for backwards compatibility, we have to delete index with same name
// as our alias, to avoid name clash.
if (clusterService.state().metadata.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) {
val acknowledgedResponse: AcknowledgedResponse = client.suspendUntil {
admin().indices().delete(DeleteIndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX))
}
if (!acknowledgedResponse.isAcknowledged) {
val errorMessage = "Deletion of old queryIndex [${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}] index is not acknowledged!"
log.error(errorMessage)
throw AlertingException.wrap(OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR))
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, even when it gets deleted here, we don't care for the old data since we will recreate it anyways, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is checked if it exist and recreated if not, on every monitor update/execute API call and every monitor execution by job scheduler

val alias = ScheduledJob.DOC_LEVEL_QUERIES_INDEX
val indexPattern = ScheduledJob.DOC_LEVEL_QUERIES_INDEX + INDEX_PATTERN_SUFIX
val indexPattern = ScheduledJob.DOC_LEVEL_QUERIES_INDEX + INDEX_PATTERN_SUFFIX
val indexRequest = CreateIndexRequest(indexPattern)
.mapping(docLevelQueriesMappings())
.alias(Alias(alias))
petardz marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -77,8 +90,20 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
if (dataSources.queryIndex == ScheduledJob.DOC_LEVEL_QUERIES_INDEX) {
return initDocLevelQueryIndex()
}
// Since we changed queryIndex to be alias now, for backwards compatibility, we have to delete index with same name
// as our alias, to avoid name clash.
if (clusterService.state().metadata.hasIndex(dataSources.queryIndex)) {
val acknowledgedResponse: AcknowledgedResponse = client.suspendUntil {
admin().indices().delete(DeleteIndexRequest(dataSources.queryIndex))
}
if (!acknowledgedResponse.isAcknowledged) {
val errorMessage = "Deletion of old queryIndex [${dataSources.queryIndex}] index is not acknowledged!"
log.error(errorMessage)
throw AlertingException.wrap(OpenSearchStatusException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR))
}
}
val alias = dataSources.queryIndex
val indexPattern = dataSources.queryIndex + INDEX_PATTERN_SUFIX
val indexPattern = dataSources.queryIndex + INDEX_PATTERN_SUFFIX
if (!clusterService.state().metadata.hasAlias(alias)) {
val indexRequest = CreateIndexRequest(indexPattern)
.mapping(docLevelQueriesMappings())
Expand Down Expand Up @@ -340,7 +365,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ

private suspend fun rolloverQueryIndex(monitor: Monitor): String? {
val queryIndex = monitor.dataSources.queryIndex
val queryIndexPattern = monitor.dataSources.queryIndex + INDEX_PATTERN_SUFIX
val queryIndexPattern = monitor.dataSources.queryIndex + INDEX_PATTERN_SUFFIX

val request = RolloverRequest(queryIndex, null)
request.createIndexRequest.index(queryIndexPattern)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.fieldcaps.FieldCapabilitiesAction
import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.action.SearchMonitorAction
Expand Down Expand Up @@ -174,9 +172,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
client().admin().indices().putMapping(
PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field")
)
val resp = client().admin().indices().execute(
FieldCapabilitiesAction.INSTANCE, FieldCapabilitiesRequest().indices(index).fields("*")
).get()
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
val id = monitorResponse.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(createRandomMonitor())
assertIndexExists(AlertIndices.ALERT_INDEX)
assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 5)
verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 6)
verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 4)
verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 4)
}
Expand All @@ -86,7 +86,7 @@ class AlertIndicesIT : AlertingRestTestCase() {
val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
executeMonitor(trueMonitor.id)
assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX)
verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 5)
verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 6)
verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,13 @@ class XContentTests : OpenSearchTestCase() {
}

fun `test MonitorMetadata`() {
val monitorMetadata = MonitorMetadata("monitorId-metadata", 0L, 0L, "monitorId", emptyList(), emptyMap(), mutableMapOf())
val monitorMetadata = MonitorMetadata(
id = "monitorId-metadata",
monitorId = "monitorId",
lastActionExecutionTimes = emptyList(),
lastRunContext = emptyMap(),
sourceToQueryIndexMapping = mutableMapOf()
)
val monitorMetadataString = monitorMetadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string()
val parsedMonitorMetadata = MonitorMetadata.parse(parser(monitorMetadataString))
assertEquals("Round tripping MonitorMetadata doesn't work", monitorMetadata, parsedMonitorMetadata)
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/resources/mappings/scheduled-jobs.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 5
"schema_version": 6
},
"properties": {
"monitor": {
Expand Down Expand Up @@ -512,6 +512,10 @@
"last_run_context": {
"type": "object",
"enabled": false
},
"source_to_query_index_mapping": {
"type": "object",
"enabled": false
}
}
}
Expand Down