diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index cd29f2671..29666bd2b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -18,6 +18,9 @@ import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse +import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest +import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest @@ -29,11 +32,13 @@ import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING import org.opensearch.rest.RestStatus private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) @@ -45,10 +50,17 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ const val NESTED = "nested" const val TYPE = "type" const val INDEX_PATTERN_SUFFIX = "-000001" + const val QUERY_INDEX_BASE_FIELDS_COUNT = 8 // 3 fields we defined and 5 builtin additional metadata fields @JvmStatic fun docLevelQueriesMappings(): String { return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() } + fun docLevelQueriesSettings(): Settings { + return Settings.builder().loadFromSource( + DocLevelMonitorQueries::class.java.classLoader.getResource("settings/doc-level-queries.json").readText(), + XContentType.JSON + ).build() + } } suspend fun initDocLevelQueryIndex(): Boolean { @@ -70,10 +82,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ val indexRequest = CreateIndexRequest(indexPattern) .mapping(docLevelQueriesMappings()) .alias(Alias(alias)) - .settings( - Settings.builder().put("index.hidden", true) - .build() - ) + .settings(docLevelQueriesSettings()) return try { val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) } createIndexResponse.isAcknowledged @@ -321,6 +330,8 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ updateMappingRequest.source(mapOf("properties" to updatedProperties)) var updateMappingResponse = AcknowledgedResponse(false) try { + // Adjust max field limit in mappings for query index, if needed. + checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex) updateMappingResponse = client.suspendUntil { client.admin().indices().putMapping(updateMappingRequest, it) } @@ -331,7 +342,10 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // If we reached limit for total number of fields in mappings, do a rollover here if (unwrappedException.message?.contains("Limit of total fields") == true) { try { + // Do queryIndex rollover targetQueryIndex = rolloverQueryIndex(monitor) + // Adjust max field limit in mappings for new index. + checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex) // PUT mappings to newly created index val updateMappingRequest = PutMappingRequest(targetQueryIndex) updateMappingRequest.source(mapOf("properties" to updatedProperties)) @@ -371,14 +385,41 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return Pair(updateMappingResponse, targetQueryIndex) } - private suspend fun rolloverQueryIndex(monitor: Monitor): String? { + /** + * Adjusts max field limit index setting for query index if source index has higher limit. + * This will prevent max field limit exception, when applying mappings to query index + */ + private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) { + val getSettingsResponse: GetSettingsResponse = client.suspendUntil { + admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex, concreteQueryIndex), it) + } + val sourceIndexLimit = + getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L + val queryIndexLimit = + getSettingsResponse.getSetting(concreteQueryIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L + // Our query index initially has 3 fields we defined and 5 more builtin metadata fields in mappings so we have to account for that + if (sourceIndexLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) { + val updateSettingsResponse: AcknowledgedResponse = client.suspendUntil { + admin().indices().updateSettings( + UpdateSettingsRequest(concreteQueryIndex).settings( + Settings.builder().put( + INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, sourceIndexLimit + QUERY_INDEX_BASE_FIELDS_COUNT + ) + ), + it + ) + } + } + } + + private suspend fun rolloverQueryIndex(monitor: Monitor): String { val queryIndex = monitor.dataSources.queryIndex val queryIndexPattern = monitor.dataSources.queryIndex + INDEX_PATTERN_SUFFIX val request = RolloverRequest(queryIndex, null) request.createIndexRequest.index(queryIndexPattern) .mapping(docLevelQueriesMappings()) - .settings(Settings.builder().put("index.hidden", true).build()) + .settings(docLevelQueriesSettings()) val response: RolloverResponse = client.suspendUntil { client.admin().indices().rolloverIndex(request, it) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 6ff15fe18..438a1f82f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -20,6 +20,7 @@ import org.opensearch.alerting.action.SearchMonitorRequest import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.transport.AlertingSingleNodeTestCase +import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest @@ -34,6 +35,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.ScheduledJob.Companion.DOC_LEVEL_QUERIES_INDEX import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.Table +import org.opensearch.index.mapper.MapperService import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder @@ -947,7 +949,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { fun `test queryIndex rollover and delete monitor success`() { val testSourceIndex = "test_source_index" - createIndex(testSourceIndex, Settings.EMPTY) + createIndex(testSourceIndex, Settings.builder().put("index.mapping.total_fields.limit", "10000").build()) val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery)) @@ -959,7 +961,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { // This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api val docPayload: StringBuilder = StringBuilder(100000) docPayload.append("{") - for (i in 1..330) { + for (i in 1..3300) { docPayload.append(""" "id$i.somefield.somefield$i":$i,""") } docPayload.append("\"test_field\" : \"us-west-2\" }") @@ -1136,16 +1138,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } /** - * 1. Create monitor with input source_index with 900 fields in mappings - can fit 1 in queryIndex - * 2. Update monitor and change input source_index to a new one with 900 fields in mappings + * 1. Create monitor with input source_index with 9000 fields in mappings - can fit 1 in queryIndex + * 2. Update monitor and change input source_index to a new one with 9000 fields in mappings * 3. Expect queryIndex rollover resulting in 2 backing indices * 4. Delete monitor and expect that all backing indices are deleted * */ fun `test updating monitor no execution queryIndex rolling over`() { val testSourceIndex1 = "test_source_index1" val testSourceIndex2 = "test_source_index2" - createIndex(testSourceIndex1, Settings.EMPTY) - createIndex(testSourceIndex2, Settings.EMPTY) + createIndex(testSourceIndex1, Settings.builder().put("index.mapping.total_fields.limit", "10000").build()) + createIndex(testSourceIndex2, Settings.builder().put("index.mapping.total_fields.limit", "10000").build()) val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) @@ -1153,10 +1155,10 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { inputs = listOf(docLevelInput), triggers = listOf(trigger) ) - // This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api + // This doc should create close to 10000 (limit) fields in index mapping. It's easier to add mappings like this then via api val docPayload: StringBuilder = StringBuilder(100000) docPayload.append("{") - for (i in 1..899) { + for (i in 1..9000) { docPayload.append(""" "id$i":$i,""") } docPayload.append("\"test_field\" : \"us-west-2\" }") @@ -1191,6 +1193,48 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals(0, getIndexResponse.indices.size) } + fun `test queryIndex gets increased max fields in mappings`() { + val testSourceIndex = "test_source_index" + createIndex(testSourceIndex, Settings.builder().put("index.mapping.total_fields.limit", "10000").build()) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + // This doc should create 12000 fields in index mapping. It's easier to add mappings like this then via api + val docPayload: StringBuilder = StringBuilder(100000) + docPayload.append("{") + for (i in 1..9998) { + docPayload.append(""" "id$i":$i,""") + } + docPayload.append("\"test_field\" : \"us-west-2\" }") + // Indexing docs here as an easier means to set index mappings + indexDoc(testSourceIndex, "1", docPayload.toString()) + // Create monitor + var monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + // Expect queryIndex to rollover after setting new source_index with close to limit amount of fields in mappings + var getIndexResponse: GetIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + assertEquals(1, getIndexResponse.indices.size) + val field_max_limit = getIndexResponse + .getSetting(DOC_LEVEL_QUERIES_INDEX + "-000001", MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key).toInt() + + assertEquals(10000 + DocLevelMonitorQueries.QUERY_INDEX_BASE_FIELDS_COUNT, field_max_limit) + + deleteMonitor(monitorResponse.id) + waitUntil { + getIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + return@waitUntil getIndexResponse.indices.isEmpty() + } + assertEquals(0, getIndexResponse.indices.size) + } + fun `test queryIndex bwc when index was not an alias`() { createIndex(DOC_LEVEL_QUERIES_INDEX, Settings.builder().put("index.hidden", true).build()) assertIndexExists(DOC_LEVEL_QUERIES_INDEX) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index 324009ec0..aa6361a87 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -23,6 +23,8 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.rest.RestStatus +import org.opensearch.test.OpenSearchTestCase +import java.util.concurrent.TimeUnit class AlertIndicesIT : AlertingRestTestCase() { @@ -235,7 +237,14 @@ class AlertIndicesIT : AlertingRestTestCase() { client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "1s") // Give some time for history to be rolled over and cleared - Thread.sleep(5000) + OpenSearchTestCase.waitUntil({ + val alertIndices = getAlertIndices().size + val docCount = getAlertHistoryDocCount() + if (alertIndices > 2 || docCount > 0) { + return@waitUntil false + } + return@waitUntil true + }, 30, TimeUnit.SECONDS) // Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted. // This leaves two indices: alert index and an empty history write index @@ -284,7 +293,14 @@ class AlertIndicesIT : AlertingRestTestCase() { client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "1s") // Give some time for history to be rolled over and cleared - Thread.sleep(5000) + OpenSearchTestCase.waitUntil({ + val alertIndices = getAlertIndices().size + val docCount = getAlertHistoryDocCount() + if (alertIndices > 2 || docCount > 0) { + return@waitUntil false + } + return@waitUntil true + }, 30, TimeUnit.SECONDS) // Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted. // This leaves two indices: alert index and an empty history write index diff --git a/core/src/main/resources/settings/doc-level-queries.json b/core/src/main/resources/settings/doc-level-queries.json new file mode 100644 index 000000000..c5cbfa445 --- /dev/null +++ b/core/src/main/resources/settings/doc-level-queries.json @@ -0,0 +1,10 @@ +{ + "index": { + "mapping": { + "total_fields": { + "limit": 10000 + } + }, + "hidden": true + } +} \ No newline at end of file