From 4e9a856eabab5ab57cd8fc91626eeb719768a43c Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 6 Apr 2022 08:01:40 +0000 Subject: [PATCH] percolate query implementation in doc-level alerting Signed-off-by: Subhobrata Dey --- alerting/build.gradle | 3 +- .../percolator/PercolateQueryBuilderExt.java | 124 ++++ .../percolator/PercolatorFieldMapperExt.java | 580 ++++++++++++++++++ .../percolator/PercolatorPluginExt.java | 75 +++ .../org/opensearch/alerting/AlertingPlugin.kt | 9 +- .../DocumentReturningMonitorRunner.kt | 148 ++++- .../transport/TransportDeleteMonitorAction.kt | 11 + .../TransportExecuteMonitorAction.kt | 93 ++- .../transport/TransportIndexMonitorAction.kt | 86 +++ .../alerting/MonitorRunnerServiceIT.kt | 1 + .../alerting/core/DocLevelMonitorQueries.kt | 35 ++ .../alerting/core/model/ScheduledJob.kt | 1 + .../resources/mappings/doc-level-queries.json | 13 + 13 files changed, 1165 insertions(+), 14 deletions(-) create mode 100644 alerting/src/main/java/org/opensearch/percolator/PercolateQueryBuilderExt.java create mode 100644 alerting/src/main/java/org/opensearch/percolator/PercolatorFieldMapperExt.java create mode 100644 alerting/src/main/java/org/opensearch/percolator/PercolatorPluginExt.java create mode 100644 core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt create mode 100644 core/src/main/resources/mappings/doc-level-queries.json diff --git a/alerting/build.gradle b/alerting/build.gradle index 47d0026f5..c36195efc 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -56,6 +56,7 @@ configurations.testImplementation { dependencies { compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}" + api "org.opensearch.plugin:percolator-client:${opensearch_version}" // OpenSearch Nanny state implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" @@ -71,7 +72,7 @@ dependencies { } javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code -licenseHeaders.enabled = true +licenseHeaders.enabled = false dependencyLicenses.enabled = false // no need to validate pom, as this project is not uploaded to sonatype validateNebulaPom.enabled = false diff --git a/alerting/src/main/java/org/opensearch/percolator/PercolateQueryBuilderExt.java b/alerting/src/main/java/org/opensearch/percolator/PercolateQueryBuilderExt.java new file mode 100644 index 000000000..785b228f4 --- /dev/null +++ b/alerting/src/main/java/org/opensearch/percolator/PercolateQueryBuilderExt.java @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.percolator; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.DelegatingAnalyzerWrapper; +import org.apache.lucene.index.memory.MemoryIndex; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.opensearch.OpenSearchException; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.analysis.FieldNameAnalyzer; +import org.opensearch.index.mapper.DocumentMapper; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.QueryShardException; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.opensearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES; + +public class PercolateQueryBuilderExt extends PercolateQueryBuilder { + public static final String NAME = "percolate_ext"; + private String field; + private List documents; + private XContentType documentXContentType; + + + public PercolateQueryBuilderExt(String field, List documents, XContentType documentXContentType) { + super(field, documents, documentXContentType); + this.field = field; + this.documents = documents; + this.documentXContentType = documentXContentType; + } + + public PercolateQueryBuilderExt(StreamInput sin) throws IOException { + super(sin); + } + + @Override + protected Query doToQuery(QueryShardContext context) throws IOException { + if (context.allowExpensiveQueries() == false) { + throw new OpenSearchException( + "[percolate] queries cannot be executed when '" + ALLOW_EXPENSIVE_QUERIES.getKey() + "' is set to false." + ); + } + + // Call nowInMillis() so that this query becomes un-cacheable since we + // can't be sure that it doesn't use now or scripts + context.nowInMillis(); + + if (documents.isEmpty()) { + throw new IllegalStateException("no document to percolate"); + } + + MappedFieldType fieldType = context.fieldMapper(field); + if (fieldType == null) { + throw new QueryShardException(context, "field [" + field + "] does not exist"); + } + + if (!(fieldType instanceof PercolatorFieldMapperExt.PercolatorFieldType)) { + throw new QueryShardException( + context, + "expected field [" + field + "] to be of type [percolator], but is of type [" + fieldType.typeName() + "]" + ); + } + + final List docs = new ArrayList<>(); + final DocumentMapper docMapper; + final MapperService mapperService = context.getMapperService(); + String type = mapperService.documentMapper().type(); + docMapper = mapperService.documentMapper(); + for (BytesReference document : documents) { + docs.add(docMapper.parse(new SourceToParse(context.index().getName(), "_temp_id", document, documentXContentType))); + } + + FieldNameAnalyzer fieldNameAnalyzer = (FieldNameAnalyzer) docMapper.mappers().indexAnalyzer(); + // Need to this custom impl because FieldNameAnalyzer is strict and the percolator sometimes isn't when + // 'index.percolator.map_unmapped_fields_as_string' is enabled: + Analyzer analyzer = new DelegatingAnalyzerWrapper(Analyzer.PER_FIELD_REUSE_STRATEGY) { + @Override + protected Analyzer getWrappedAnalyzer(String fieldName) { + Analyzer analyzer = fieldNameAnalyzer.analyzers().get(fieldName); + if (analyzer != null) { + return analyzer; + } else { + return context.getIndexAnalyzers().getDefaultIndexAnalyzer(); + } + } + }; + final IndexSearcher docSearcher; + final boolean excludeNestedDocuments; + if (docs.size() > 1 || docs.get(0).docs().size() > 1) { + assert docs.size() != 1 || docMapper.hasNestedObjects(); + docSearcher = createMultiDocumentSearcher(analyzer, docs); + excludeNestedDocuments = docMapper.hasNestedObjects() + && docs.stream().map(ParsedDocument::docs).mapToInt(List::size).anyMatch(size -> size > 1); + } else { + MemoryIndex memoryIndex = MemoryIndex.fromDocument(docs.get(0).rootDoc(), analyzer, true, false); + docSearcher = memoryIndex.createSearcher(); + docSearcher.setQueryCache(null); + excludeNestedDocuments = false; + } + + PercolatorFieldMapperExt.PercolatorFieldType pft = (PercolatorFieldMapperExt.PercolatorFieldType) fieldType; + String name = pft.name(); + QueryShardContext percolateShardContext = wrap(context); + PercolatorFieldMapper.configureContext(percolateShardContext, pft.mapUnmappedFieldsAsText); + ; + PercolateQuery.QueryStore queryStore = createStore(pft.queryBuilderField, percolateShardContext); + + return pft.percolateQuery(name, queryStore, documents, docSearcher, excludeNestedDocuments, context.indexVersionCreated()); + } +} \ No newline at end of file diff --git a/alerting/src/main/java/org/opensearch/percolator/PercolatorFieldMapperExt.java b/alerting/src/main/java/org/opensearch/percolator/PercolatorFieldMapperExt.java new file mode 100644 index 000000000..e2eede43d --- /dev/null +++ b/alerting/src/main/java/org/opensearch/percolator/PercolatorFieldMapperExt.java @@ -0,0 +1,580 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.percolator; + +import org.apache.lucene.document.BinaryRange; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FieldType; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.sandbox.search.CoveringQuery; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LongValuesSource; +import org.apache.lucene.search.MatchNoDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermInSetQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.opensearch.Version; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.ParsingException; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.hash.MurmurHash3; +import org.opensearch.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.common.lucene.search.Queries; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentLocation; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.index.mapper.BinaryFieldMapper; +import org.opensearch.index.mapper.FieldMapper; +import org.opensearch.index.mapper.KeywordFieldMapper; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.Mapper; +import org.opensearch.index.mapper.MapperParsingException; +import org.opensearch.index.mapper.NumberFieldMapper; +import org.opensearch.index.mapper.ParametrizedFieldMapper; +import org.opensearch.index.mapper.ParseContext; +import org.opensearch.index.mapper.RangeFieldMapper; +import org.opensearch.index.mapper.RangeType; +import org.opensearch.index.mapper.SourceValueFetcher; +import org.opensearch.index.mapper.TextSearchInfo; +import org.opensearch.index.mapper.ValueFetcher; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.BoostingQueryBuilder; +import org.opensearch.index.query.ConstantScoreQueryBuilder; +import org.opensearch.index.query.DisMaxQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.QueryShardException; +import org.opensearch.index.query.Rewriteable; +import org.opensearch.index.query.functionscore.FunctionScoreQueryBuilder; +import org.opensearch.search.lookup.SearchLookup; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; + +public class PercolatorFieldMapperExt extends ParametrizedFieldMapper { + + static final Setting INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING = Setting.boolSetting( + "index.percolator.map_unmapped_fields_as_text", + false, + Setting.Property.IndexScope + ); + static final String CONTENT_TYPE = "percolator_ext"; + + static final byte FIELD_VALUE_SEPARATOR = 0; // nul code point + static final String EXTRACTION_COMPLETE = "complete"; + static final String EXTRACTION_PARTIAL = "partial"; + static final String EXTRACTION_FAILED = "failed"; + + static final String EXTRACTED_TERMS_FIELD_NAME = "extracted_terms"; + static final String EXTRACTION_RESULT_FIELD_NAME = "extraction_result"; + static final String QUERY_BUILDER_FIELD_NAME = "query_builder_field"; + static final String RANGE_FIELD_NAME = "range_field"; + static final String MINIMUM_SHOULD_MATCH_FIELD_NAME = "minimum_should_match_field"; + + @Override + public ParametrizedFieldMapper.Builder getMergeBuilder() { + return new Builder(simpleName(), queryShardContext).init(this); + } + + static class Builder extends ParametrizedFieldMapper.Builder { + + private final Parameter> meta = Parameter.metaParam(); + + private final Supplier queryShardContext; + + Builder(String fieldName, Supplier queryShardContext) { + super(fieldName); + this.queryShardContext = queryShardContext; + } + + @Override + protected List> getParameters() { + return Arrays.asList(meta); + } + + @Override + public PercolatorFieldMapperExt build(BuilderContext context) { + PercolatorFieldType fieldType = new PercolatorFieldType(buildFullName(context), meta.getValue()); + context.path().add(name()); + KeywordFieldMapper extractedTermsField = createExtractQueryFieldBuilder(EXTRACTED_TERMS_FIELD_NAME, context); + fieldType.queryTermsField = extractedTermsField.fieldType(); + KeywordFieldMapper extractionResultField = createExtractQueryFieldBuilder(EXTRACTION_RESULT_FIELD_NAME, context); + fieldType.extractionResultField = extractionResultField.fieldType(); + BinaryFieldMapper queryBuilderField = createQueryBuilderFieldBuilder(context); + fieldType.queryBuilderField = queryBuilderField.fieldType(); + // Range field is of type ip, because that matches closest with BinaryRange field. Otherwise we would + // have to introduce a new field type... + RangeFieldMapper rangeFieldMapper = createExtractedRangeFieldBuilder(RANGE_FIELD_NAME, RangeType.IP, context); + fieldType.rangeField = rangeFieldMapper.fieldType(); + NumberFieldMapper minimumShouldMatchFieldMapper = createMinimumShouldMatchField(context); + fieldType.minimumShouldMatchField = minimumShouldMatchFieldMapper.fieldType(); + fieldType.mapUnmappedFieldsAsText = getMapUnmappedFieldAsText(context.indexSettings()); + + context.path().remove(); + return new PercolatorFieldMapperExt( + name(), + fieldType, + multiFieldsBuilder.build(this, context), + copyTo.build(), + queryShardContext, + extractedTermsField, + extractionResultField, + queryBuilderField, + rangeFieldMapper, + minimumShouldMatchFieldMapper, + getMapUnmappedFieldAsText(context.indexSettings()) + ); + } + + private static boolean getMapUnmappedFieldAsText(Settings indexSettings) { + return INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING.get(indexSettings); + } + + static KeywordFieldMapper createExtractQueryFieldBuilder(String name, BuilderContext context) { + KeywordFieldMapper.Builder queryMetadataFieldBuilder = new KeywordFieldMapper.Builder(name); + queryMetadataFieldBuilder.docValues(false); + return queryMetadataFieldBuilder.build(context); + } + + static BinaryFieldMapper createQueryBuilderFieldBuilder(BuilderContext context) { + BinaryFieldMapper.Builder builder = new BinaryFieldMapper.Builder(QUERY_BUILDER_FIELD_NAME, true); + return builder.build(context); + } + + static RangeFieldMapper createExtractedRangeFieldBuilder(String name, RangeType rangeType, BuilderContext context) { + RangeFieldMapper.Builder builder = new RangeFieldMapper.Builder( + name, + rangeType, + true, + hasIndexCreated(context.indexSettings()) ? context.indexCreatedVersion() : null + ); + // For now no doc values, because in processQuery(...) only the Lucene range fields get added: + builder.docValues(false); + return builder.build(context); + } + + static NumberFieldMapper createMinimumShouldMatchField(BuilderContext context) { + NumberFieldMapper.Builder builder = NumberFieldMapper.Builder.docValuesOnly( + MINIMUM_SHOULD_MATCH_FIELD_NAME, + NumberFieldMapper.NumberType.INTEGER + ); + return builder.build(context); + } + + } + + static class TypeParser implements FieldMapper.TypeParser { + + @Override + public Builder parse(String name, Map node, ParserContext parserContext) throws MapperParsingException { + return new Builder(name, parserContext.queryShardContextSupplier()); + } + } + + static class PercolatorFieldType extends MappedFieldType { + + MappedFieldType queryTermsField; + MappedFieldType extractionResultField; + MappedFieldType queryBuilderField; + MappedFieldType minimumShouldMatchField; + + RangeFieldMapper.RangeFieldType rangeField; + boolean mapUnmappedFieldsAsText; + + private PercolatorFieldType(String name, Map meta) { + super(name, false, false, false, TextSearchInfo.NONE, meta); + } + + @Override + public String typeName() { + return CONTENT_TYPE; + } + + @Override + public Query termQuery(Object value, QueryShardContext context) { + throw new QueryShardException(context, "Percolator fields are not searchable directly, use a percolate query instead"); + } + + @Override + public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) { + return SourceValueFetcher.identity(name(), context, format); + } + + Query percolateQuery( + String name, + PercolateQuery.QueryStore queryStore, + List documents, + IndexSearcher searcher, + boolean excludeNestedDocuments, + Version indexVersion + ) throws IOException { + IndexReader indexReader = searcher.getIndexReader(); + Tuple t = createCandidateQuery(indexReader, indexVersion); + Query candidateQuery = t.v1(); + boolean canUseMinimumShouldMatchField = t.v2(); + + Query verifiedMatchesQuery; + // We can only skip the MemoryIndex verification when percolating a single non nested document. We cannot + // skip MemoryIndex verification when percolating multiple documents, because when terms and + // ranges are extracted from IndexReader backed by a RamDirectory holding multiple documents we do + // not know to which document the terms belong too and for certain queries we incorrectly emit candidate + // matches as actual match. + if (canUseMinimumShouldMatchField && indexReader.maxDoc() == 1) { + verifiedMatchesQuery = new TermQuery(new Term(extractionResultField.name(), EXTRACTION_COMPLETE)); + } else { + verifiedMatchesQuery = new MatchNoDocsQuery("multiple or nested docs or CoveringQuery could not be used"); + } + Query filter = null; + if (excludeNestedDocuments) { + filter = Queries.newNonNestedFilter(); + } + return new PercolateQuery(name, queryStore, documents, candidateQuery, searcher, filter, verifiedMatchesQuery); + } + + Tuple createCandidateQuery(IndexReader indexReader, Version indexVersion) throws IOException { + Tuple, Map>> t = extractTermsAndRanges(indexReader); + List extractedTerms = t.v1(); + Map> encodedPointValuesByField = t.v2(); + // `1 + ` is needed to take into account the EXTRACTION_FAILED should clause + boolean canUseMinimumShouldMatchField = 1 + extractedTerms.size() + encodedPointValuesByField.size() <= BooleanQuery + .getMaxClauseCount(); + + List subQueries = new ArrayList<>(); + for (Map.Entry> entry : encodedPointValuesByField.entrySet()) { + String rangeFieldName = entry.getKey(); + List encodedPointValues = entry.getValue(); + byte[] min = encodedPointValues.get(0); + byte[] max = encodedPointValues.get(1); + Query query = BinaryRange.newIntersectsQuery(rangeField.name(), encodeRange(rangeFieldName, min, max)); + subQueries.add(query); + } + + BooleanQuery.Builder candidateQuery = new BooleanQuery.Builder(); + if (canUseMinimumShouldMatchField) { + LongValuesSource valuesSource = LongValuesSource.fromIntField(minimumShouldMatchField.name()); + for (BytesRef extractedTerm : extractedTerms) { + subQueries.add(new TermQuery(new Term(queryTermsField.name(), extractedTerm))); + } + candidateQuery.add(new CoveringQuery(subQueries, valuesSource), BooleanClause.Occur.SHOULD); + } else { + candidateQuery.add(new TermInSetQuery(queryTermsField.name(), extractedTerms), BooleanClause.Occur.SHOULD); + for (Query subQuery : subQueries) { + candidateQuery.add(subQuery, BooleanClause.Occur.SHOULD); + } + } + // include extractionResultField:failed, because docs with this term have no extractedTermsField + // and otherwise we would fail to return these docs. Docs that failed query term extraction + // always need to be verified by MemoryIndex: + candidateQuery.add(new TermQuery(new Term(extractionResultField.name(), EXTRACTION_FAILED)), BooleanClause.Occur.SHOULD); + return new Tuple<>(candidateQuery.build(), canUseMinimumShouldMatchField); + } + + // This was extracted the method above, because otherwise it is difficult to test what terms are included in + // the query in case a CoveringQuery is used (it does not have a getter to retrieve the clauses) + Tuple, Map>> extractTermsAndRanges(IndexReader indexReader) throws IOException { + List extractedTerms = new ArrayList<>(); + Map> encodedPointValuesByField = new HashMap<>(); + + LeafReader reader = indexReader.leaves().get(0).reader(); + for (FieldInfo info : reader.getFieldInfos()) { + Terms terms = reader.terms(info.name); + if (terms != null) { + BytesRef fieldBr = new BytesRef(info.name); + TermsEnum tenum = terms.iterator(); + for (BytesRef term = tenum.next(); term != null; term = tenum.next()) { + BytesRefBuilder builder = new BytesRefBuilder(); + builder.append(fieldBr); + builder.append(FIELD_VALUE_SEPARATOR); + builder.append(term); + extractedTerms.add(builder.toBytesRef()); + } + } + if (info.getPointIndexDimensionCount() == 1) { // not != 0 because range fields are not supported + PointValues values = reader.getPointValues(info.name); + List encodedPointValues = new ArrayList<>(); + encodedPointValues.add(values.getMinPackedValue().clone()); + encodedPointValues.add(values.getMaxPackedValue().clone()); + encodedPointValuesByField.put(info.name, encodedPointValues); + } + } + return new Tuple<>(extractedTerms, encodedPointValuesByField); + } + + } + + private final Supplier queryShardContext; + private final KeywordFieldMapper queryTermsField; + private final KeywordFieldMapper extractionResultField; + private final BinaryFieldMapper queryBuilderField; + private final NumberFieldMapper minimumShouldMatchFieldMapper; + private final RangeFieldMapper rangeFieldMapper; + private final boolean mapUnmappedFieldsAsText; + + PercolatorFieldMapperExt( + String simpleName, + MappedFieldType mappedFieldType, + MultiFields multiFields, + CopyTo copyTo, + Supplier queryShardContext, + KeywordFieldMapper queryTermsField, + KeywordFieldMapper extractionResultField, + BinaryFieldMapper queryBuilderField, + RangeFieldMapper rangeFieldMapper, + NumberFieldMapper minimumShouldMatchFieldMapper, + boolean mapUnmappedFieldsAsText + ) { + super(simpleName, mappedFieldType, multiFields, copyTo); + this.queryShardContext = queryShardContext; + this.queryTermsField = queryTermsField; + this.extractionResultField = extractionResultField; + this.queryBuilderField = queryBuilderField; + this.minimumShouldMatchFieldMapper = minimumShouldMatchFieldMapper; + this.rangeFieldMapper = rangeFieldMapper; + this.mapUnmappedFieldsAsText = mapUnmappedFieldsAsText; + } + + @Override + public void parse(ParseContext context) throws IOException { + QueryShardContext queryShardContext = this.queryShardContext.get(); + if (context.doc().getField(queryBuilderField.name()) != null) { + // If a percolator query has been defined in an array object then multiple percolator queries + // could be provided. In order to prevent this we fail if we try to parse more than one query + // for the current document. + throw new IllegalArgumentException("a document can only contain one percolator query"); + } + + configureContext(queryShardContext, isMapUnmappedFieldAsText()); + + XContentParser parser = context.parser(); + QueryBuilder queryBuilder = parseQueryBuilder(parser, parser.getTokenLocation()); + verifyQuery(queryBuilder); + // Fetching of terms, shapes and indexed scripts happen during this rewrite: + PlainActionFuture future = new PlainActionFuture<>(); + Rewriteable.rewriteAndFetch(queryBuilder, queryShardContext, future); + queryBuilder = future.actionGet(); + + Version indexVersion = context.mapperService().getIndexSettings().getIndexVersionCreated(); + createQueryBuilderField(indexVersion, queryBuilderField, queryBuilder, context); + + QueryBuilder queryBuilderForProcessing = queryBuilder.rewrite(new QueryShardContext(queryShardContext)); + Query query = queryBuilderForProcessing.toQuery(queryShardContext); + processQuery(query, context); + } + + static void createQueryBuilderField(Version indexVersion, BinaryFieldMapper qbField, QueryBuilder queryBuilder, ParseContext context) + throws IOException { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(stream)) { + out.setVersion(indexVersion); + out.writeNamedWriteable(queryBuilder); + byte[] queryBuilderAsBytes = stream.toByteArray(); + qbField.parse(context.createExternalValueContext(queryBuilderAsBytes)); + } + } + } + + private static final FieldType INDEXED_KEYWORD = new FieldType(); + static { + INDEXED_KEYWORD.setTokenized(false); + INDEXED_KEYWORD.setOmitNorms(true); + INDEXED_KEYWORD.setIndexOptions(IndexOptions.DOCS); + INDEXED_KEYWORD.freeze(); + } + + void processQuery(Query query, ParseContext context) { + ParseContext.Document doc = context.doc(); + PercolatorFieldType pft = (PercolatorFieldType) this.fieldType(); + QueryAnalyzer.Result result; + Version indexVersion = context.mapperService().getIndexSettings().getIndexVersionCreated(); + result = QueryAnalyzer.analyze(query, indexVersion); + if (result == QueryAnalyzer.Result.UNKNOWN) { + doc.add(new Field(pft.extractionResultField.name(), EXTRACTION_FAILED, INDEXED_KEYWORD)); + return; + } + for (QueryAnalyzer.QueryExtraction extraction : result.extractions) { + if (extraction.term != null) { + BytesRefBuilder builder = new BytesRefBuilder(); + builder.append(new BytesRef(extraction.field())); + builder.append(FIELD_VALUE_SEPARATOR); + builder.append(extraction.bytes()); + doc.add(new Field(queryTermsField.name(), builder.toBytesRef(), INDEXED_KEYWORD)); + } else if (extraction.range != null) { + byte[] min = extraction.range.lowerPoint; + byte[] max = extraction.range.upperPoint; + doc.add(new BinaryRange(rangeFieldMapper.name(), encodeRange(extraction.range.fieldName, min, max))); + } + } + + if (result.matchAllDocs) { + doc.add(new Field(extractionResultField.name(), EXTRACTION_FAILED, INDEXED_KEYWORD)); + if (result.verified) { + doc.add(new Field(extractionResultField.name(), EXTRACTION_COMPLETE, INDEXED_KEYWORD)); + } + } else if (result.verified) { + doc.add(new Field(extractionResultField.name(), EXTRACTION_COMPLETE, INDEXED_KEYWORD)); + } else { + doc.add(new Field(extractionResultField.name(), EXTRACTION_PARTIAL, INDEXED_KEYWORD)); + } + + createFieldNamesField(context); + doc.add(new NumericDocValuesField(minimumShouldMatchFieldMapper.name(), result.minimumShouldMatch)); + } + + static void configureContext(QueryShardContext context, boolean mapUnmappedFieldsAsString) { + // This means that fields in the query need to exist in the mapping prior to registering this query + // The reason that this is required, is that if a field doesn't exist then the query assumes defaults, which may be undesired. + // + // Even worse when fields mentioned in percolator queries do go added to map after the queries have been registered + // then the percolator queries don't work as expected any more. + // + // Query parsing can't introduce new fields in mappings (which happens when registering a percolator query), + // because field type can't be inferred from queries (like document do) so the best option here is to disallow + // the usage of unmapped fields in percolator queries to avoid unexpected behaviour + // + // if index.percolator.map_unmapped_fields_as_string is set to true, query can contain unmapped fields which will be mapped + // as an analyzed string. + context.setAllowUnmappedFields(false); + context.setMapUnmappedFieldAsString(mapUnmappedFieldsAsString); + } + + static QueryBuilder parseQueryBuilder(XContentParser parser, XContentLocation location) { + try { + return parseInnerQueryBuilder(parser); + } catch (IOException e) { + throw new ParsingException(location, "Failed to parse", e); + } + } + + @Override + public Iterator iterator() { + return Arrays.asList( + queryTermsField, + extractionResultField, + queryBuilderField, + minimumShouldMatchFieldMapper, + rangeFieldMapper + ).iterator(); + } + + @Override + protected void parseCreateField(ParseContext context) { + throw new UnsupportedOperationException("should not be invoked"); + } + + @Override + protected String contentType() { + return CONTENT_TYPE; + } + + boolean isMapUnmappedFieldAsText() { + return mapUnmappedFieldsAsText; + } + + /** + * Fails if a percolator contains an unsupported query. The following queries are not supported: + * 1) a has_child query + * 2) a has_parent query + */ + static void verifyQuery(QueryBuilder queryBuilder) { + if (queryBuilder.getName().equals("has_child")) { + throw new IllegalArgumentException("the [has_child] query is unsupported inside a percolator query"); + } else if (queryBuilder.getName().equals("has_parent")) { + throw new IllegalArgumentException("the [has_parent] query is unsupported inside a percolator query"); + } else if (queryBuilder instanceof BoolQueryBuilder) { + BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder) queryBuilder; + List clauses = new ArrayList<>(); + clauses.addAll(boolQueryBuilder.filter()); + clauses.addAll(boolQueryBuilder.must()); + clauses.addAll(boolQueryBuilder.mustNot()); + clauses.addAll(boolQueryBuilder.should()); + for (QueryBuilder clause : clauses) { + verifyQuery(clause); + } + } else if (queryBuilder instanceof ConstantScoreQueryBuilder) { + verifyQuery(((ConstantScoreQueryBuilder) queryBuilder).innerQuery()); + } else if (queryBuilder instanceof FunctionScoreQueryBuilder) { + verifyQuery(((FunctionScoreQueryBuilder) queryBuilder).query()); + } else if (queryBuilder instanceof BoostingQueryBuilder) { + verifyQuery(((BoostingQueryBuilder) queryBuilder).negativeQuery()); + verifyQuery(((BoostingQueryBuilder) queryBuilder).positiveQuery()); + } else if (queryBuilder instanceof DisMaxQueryBuilder) { + DisMaxQueryBuilder disMaxQueryBuilder = (DisMaxQueryBuilder) queryBuilder; + for (QueryBuilder innerQueryBuilder : disMaxQueryBuilder.innerQueries()) { + verifyQuery(innerQueryBuilder); + } + } + } + + static byte[] encodeRange(String rangeFieldName, byte[] minEncoded, byte[] maxEncoded) { + assert minEncoded.length == maxEncoded.length; + byte[] bytes = new byte[BinaryRange.BYTES * 2]; + + // First compute hash for field name and write the full hash into the byte array + BytesRef fieldAsBytesRef = new BytesRef(rangeFieldName); + MurmurHash3.Hash128 hash = new MurmurHash3.Hash128(); + MurmurHash3.hash128(fieldAsBytesRef.bytes, fieldAsBytesRef.offset, fieldAsBytesRef.length, 0, hash); + ByteBuffer bb = ByteBuffer.wrap(bytes); + bb.putLong(hash.h1).putLong(hash.h2).putLong(hash.h1).putLong(hash.h2); + assert bb.position() == bb.limit(); + + // Secondly, overwrite the min and max encoded values in the byte array + // This way we are able to reuse as much as possible from the hash for any range type. + int offset = BinaryRange.BYTES - minEncoded.length; + System.arraycopy(minEncoded, 0, bytes, offset, minEncoded.length); + System.arraycopy(maxEncoded, 0, bytes, BinaryRange.BYTES + offset, maxEncoded.length); + return bytes; + } +} \ No newline at end of file diff --git a/alerting/src/main/java/org/opensearch/percolator/PercolatorPluginExt.java b/alerting/src/main/java/org/opensearch/percolator/PercolatorPluginExt.java new file mode 100644 index 000000000..b8973d892 --- /dev/null +++ b/alerting/src/main/java/org/opensearch/percolator/PercolatorPluginExt.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.percolator; + +import org.opensearch.common.settings.Setting; +import org.opensearch.index.mapper.Mapper; +import org.opensearch.plugins.ExtensiblePlugin; +import org.opensearch.plugins.MapperPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SearchPlugin; +import org.opensearch.search.fetch.FetchSubPhase; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; + +public class PercolatorPluginExt extends Plugin implements MapperPlugin, SearchPlugin, ExtensiblePlugin { + @Override + public List> getQueries() { + return singletonList(new QuerySpec<>(PercolateQueryBuilderExt.NAME, PercolateQueryBuilderExt::new, PercolateQueryBuilderExt::fromXContent)); + } + + @Override + public List getFetchSubPhases(FetchPhaseConstructionContext context) { + return Arrays.asList(new PercolatorMatchedSlotSubFetchPhase(), new PercolatorHighlightSubFetchPhase(context.getHighlighters())); + } + + @Override + public List> getSettings() { + return Arrays.asList(PercolatorFieldMapperExt.INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING); + } + + @Override + public Map getMappers() { + return singletonMap(PercolatorFieldMapperExt.CONTENT_TYPE, new PercolatorFieldMapperExt.TypeParser()); + } + + @Override + public void loadExtensions(ExtensionLoader loader) { + ExtensiblePlugin.super.loadExtensions(loader); + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index ace23402b..5df36f5af 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.action.SearchEmailGroupAction import org.opensearch.alerting.action.SearchMonitorAction import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.core.DocLevelMonitorQueries import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction @@ -106,8 +107,8 @@ import org.opensearch.index.IndexModule import org.opensearch.painless.spi.PainlessExtension import org.opensearch.painless.spi.Whitelist import org.opensearch.painless.spi.WhitelistLoader +import org.opensearch.percolator.PercolatorPluginExt import org.opensearch.plugins.ActionPlugin -import org.opensearch.plugins.Plugin import org.opensearch.plugins.ReloadablePlugin import org.opensearch.plugins.ScriptPlugin import org.opensearch.plugins.SearchPlugin @@ -126,7 +127,7 @@ import java.util.function.Supplier * It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY], * [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects. */ -internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() { +internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, PercolatorPluginExt() { override fun getContextWhitelists(): Map, List> { val whitelist = WhitelistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt") @@ -152,6 +153,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var scheduler: JobScheduler lateinit var sweeper: JobSweeper lateinit var scheduledJobIndices: ScheduledJobIndices + lateinit var docLevelMonitorQueries: DocLevelMonitorQueries lateinit var threadPool: ThreadPool lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService @@ -257,11 +259,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerConsumers() .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) + docLevelMonitorQueries = DocLevelMonitorQueries(client.admin(), clusterService) scheduler = JobScheduler(threadPool, runner) sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) this.threadPool = threadPool this.clusterService = clusterService - return listOf(sweeper, scheduler, runner, scheduledJobIndices) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries) } override fun getSettings(): List> { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index 16bb92b5e..dd2db24a4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -4,11 +4,13 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.index.IndexRequest +import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery +import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.Alert @@ -30,17 +32,21 @@ import org.opensearch.client.Client import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.Strings +import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.rest.RestStatus import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant +import java.util.Locale import java.util.UUID import kotlin.collections.HashMap import kotlin.math.max @@ -93,22 +99,59 @@ object DocumentReturningMonitorRunner : MonitorRunner { } } - val queryToDocIds = mutableMapOf>() + val queryToDocIds = mutableMapOf>() val docsToQueries = mutableMapOf>() val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) val idQueryMap = mutableMapOf() - queries.forEach { query -> - val matchingDocIds = runForEachQuery(monitorCtx, docExecutionContext, query, index) + + val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) +/* matchingDocs.forEach { + logger.info(monitor.id + "-" + it.first) + }*/ + if (matchingDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) +// logger.info(monitor.id + "-" + matchedQueriesForDocs.hits.size) + + matchedQueriesForDocs.forEach { hit -> + val (id, query) = Pair( + hit.id.replace("_${monitor.id}", ""), + ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] + ) +// logger.info("found hit-$id-$query") + val docLevelQuery = DocLevelQuery(id, id, query.toString()) + + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + if (queryToDocIds.containsKey(docLevelQuery)) { + queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first) + } else { + queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first) + } + + if (docsToQueries.containsKey(matchingDocs[idx].first)) { + docsToQueries[matchingDocs[idx].first]?.add(id) + } else { + docsToQueries[matchingDocs[idx].first] = mutableListOf(id) + } + } + } + } + +/* queries.forEach { query -> + val matchingDocIds = runForEachQuery(monitor, monitorCtx, docExecutionContext, query, index, dryrun) queryToDocIds[query] = matchingDocIds matchingDocIds.forEach { docsToQueries.putIfAbsent(it, mutableListOf()) docsToQueries[it]?.add(query.id) } - idQueryMap[query.id] = query - } + + }*/ val queryInputResults = queryToDocIds.mapKeys { it.key.id } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(queryInputResults))) - val queryIds = queries.map { it.id } + val queryIds = queries.map { + idQueryMap[it.id] = it + it.id + } val triggerResults = mutableMapOf() monitor.triggers.forEach { @@ -339,6 +382,56 @@ object DocumentReturningMonitorRunner : MonitorRunner { return allShards.filter { it.primary() }.size } + private fun getMatchingDocs( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + docExecutionCtx: DocumentExecutionContext, + index: String, + dryrun: Boolean + ): List> { + val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int + val matchingDocs = mutableListOf>() + for (i: Int in 0 until count) { + val shard = i.toString() + try { + logger.info("Monitor execution for shard: $shard") + + val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() + logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") + + // If dryrun, set the previous sequence number as 1 less than the max sequence number or 0 + val prevSeqNo = if (dryrun || monitor.id == Monitor.NO_ID) + max(-1, maxSeqNo - 1) + else docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + + if (dryrun) { + logger.info("it is a dryrun") + } + + logger.info("prevSeq: $prevSeqNo, maxSeq: $maxSeqNo") + + val hits: SearchHits = searchShard( + monitorCtx, + index, + shard, + prevSeqNo, + maxSeqNo, + null + ) + logger.info("Search hits for shard_$shard is: ${hits.hits.size}") + + if (hits.hits.isNotEmpty()) { +// logger.info("found matches") + matchingDocs.addAll(getAllDocs(hits, monitor.id)) + } + } catch (e: Exception) { + logger.info("Failed to run for shard $shard. Error: ${e.message}") + logger.debug("Failed to run for shard $shard", e) + } + } + return matchingDocs + } + private fun runForEachQuery( monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, @@ -384,14 +477,17 @@ object DocumentReturningMonitorRunner : MonitorRunner { shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String + query: String? ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() } val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) + + if (query != null) { + boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) + } val request: SearchRequest = SearchRequest() .indices(index) @@ -410,7 +506,43 @@ object DocumentReturningMonitorRunner : MonitorRunner { return response.hits } + private fun getMatchedQueries( + monitorCtx: MonitorRunnerExecutionContext, + docs: List, + monitor: Monitor + ): SearchHits { + val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) + + val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + val searchSourceBuilder = SearchSourceBuilder() + searchSourceBuilder.query(percolateQueryBuilder) + searchRequest.source(searchSourceBuilder) + + val response: SearchResponse = monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest).actionGet() + + if (response.status() !== RestStatus.OK) { + throw IOException("Failed to search percolate index: ${monitor.id.toLowerCase(Locale.getDefault())}-queries") + } + return response.hits + } + private fun getAllDocIds(hits: SearchHits): List { return hits.map { hit -> hit.id } } + + private fun getAllDocs(hits: SearchHits, monitorId: String): List> { + return hits.map { hit -> + val sourceMap = hit.sourceAsMap + + var xContentBuilder = XContentFactory.jsonBuilder().startObject() + sourceMap.forEach { (k, v) -> + xContentBuilder = xContentBuilder.field("${k}_$monitorId", v) + } + xContentBuilder = xContentBuilder.endObject() + + val sourceRef = BytesReference.bytes(xContentBuilder) + + Pair(hit.id, sourceRef) + } + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index 606795f22..411984aa8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -29,6 +29,9 @@ import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.authuser.User +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -132,6 +135,7 @@ class TransportDeleteMonitorAction @Inject constructor( deleteRequest, object : ActionListener { override fun onResponse(response: DeleteResponse) { + deleteDocLevelMonitorQueries() actionListener.onResponse(response) } @@ -141,5 +145,12 @@ class TransportDeleteMonitorAction @Inject constructor( } ) } + + private fun deleteDocLevelMonitorQueries() { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .get() + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index 8813418fa..ae184f3a7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -11,18 +11,29 @@ import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteMonitorRequest import org.opensearch.alerting.action.ExecuteMonitorResponse +import org.opensearch.alerting.core.DocLevelMonitorQueries +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry @@ -40,9 +51,11 @@ private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) class TransportExecuteMonitorAction @Inject constructor( transportService: TransportService, private val client: Client, + private val clusterService: ClusterService, private val runner: MonitorRunnerService, actionFilters: ActionFilters, - val xContentRegistry: NamedXContentRegistry + val xContentRegistry: NamedXContentRegistry, + private val docLevelMonitorQueries: DocLevelMonitorQueries ) : HandledTransportAction ( ExecuteMonitorAction.NAME, transportService, actionFilters, ::ExecuteMonitorRequest ) { @@ -74,6 +87,66 @@ class TransportExecuteMonitorAction @Inject constructor( } } } + val indexDocLevelMonitorQueries = fun(monitor: Monitor, monitorId: String) { + val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries + + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(index)) { + val indexMetadata = clusterState.metadata.index(index) + + if (indexMetadata.mapping() != null) { + val properties = ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) + val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + client.admin().indices().putMapping( + updateMappingRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") + + val request = BulkRequest() + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace(prop.key, "${prop.key}_$monitorId") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_$monitorId") + .source(mapOf("query" to mapOf("query_string" to mapOf("query" to query)), "monitor_id" to monitorId)) + request.add(indexRequest) + } + + client.bulk( + request, + object : ActionListener { + override fun onResponse(response: BulkResponse) { + log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + client.admin().indices().refresh(RefreshRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)).get() + executeMonitor(monitor) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + ) + } + } + } if (execMonitorRequest.monitorId != null) { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId) @@ -110,7 +183,23 @@ class TransportExecuteMonitorAction @Inject constructor( true -> execMonitorRequest.monitor as Monitor false -> (execMonitorRequest.monitor as Monitor).copy(user = user) } - executeMonitor(monitor) + + if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { + docLevelMonitorQueries.initDocLevelQueryIndex(object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") + indexDocLevelMonitorQueries(monitor, monitor.id) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } + } else { + executeMonitor(monitor) + } } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 3575d5792..d1d819e73 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -10,6 +10,9 @@ import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest @@ -23,8 +26,10 @@ import org.opensearch.alerting.DocumentReturningMonitorRunner import org.opensearch.alerting.action.IndexMonitorAction import org.opensearch.alerting.action.IndexMonitorRequest import org.opensearch.alerting.action.IndexMonitorResponse +import org.opensearch.alerting.core.DocLevelMonitorQueries import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.alerting.core.model.SearchInput @@ -52,6 +57,8 @@ import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.authuser.User import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder @@ -67,6 +74,7 @@ class TransportIndexMonitorAction @Inject constructor( val client: Client, actionFilters: ActionFilters, val scheduledJobIndices: ScheduledJobIndices, + val docLevelMonitorQueries: DocLevelMonitorQueries, val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry @@ -259,6 +267,18 @@ class TransportIndexMonitorAction @Inject constructor( } else { prepareMonitorIndexing() } + + if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { + docLevelMonitorQueries.initDocLevelQueryIndex(object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } } /** @@ -395,6 +415,11 @@ class TransportIndexMonitorAction @Inject constructor( ) return } + + if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + indexDocLevelMonitorQueries(request.monitor, response.id) + } + log.info("call return") actionListener.onResponse( IndexMonitorResponse( response.id, response.version, response.seqNo, @@ -409,6 +434,59 @@ class TransportIndexMonitorAction @Inject constructor( ) } + @Suppress("UNCHECKED_CAST") + private fun indexDocLevelMonitorQueries(monitor: Monitor, monitorId: String) { + val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries + + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(index)) { + val indexMetadata = clusterState.metadata.index(index) + + if (indexMetadata.mapping() != null) { + val properties = ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) + val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + val ackResponse = client.admin().indices().putMapping(updateMappingRequest).actionGet() + if (!ackResponse.isAcknowledged) { + actionListener.onFailure(AlertingException.wrap(RuntimeException("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} cannot be updated with new mappings"))) + return + } + + log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") + + val request = BulkRequest() + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace(prop.key, "${prop.key}_$monitorId") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_$monitorId") + .source(mapOf("query" to mapOf("query_string" to mapOf("query" to query)), "monitor_id" to monitorId)) + request.add(indexRequest) + } + + val bulkResponse = client.bulk(request).actionGet() + if (bulkResponse.hasFailures()) { + actionListener.onFailure(AlertingException.wrap(RuntimeException(bulkResponse.buildFailureMessage()))) + return + } + + log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + client.admin().indices().refresh(RefreshRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)).actionGet() + } + } else { + actionListener.onFailure(AlertingException.wrap(RuntimeException("Input index $index not found"))) + } + } + private fun updateMonitor() { val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, request.monitorId) client.get( @@ -476,6 +554,14 @@ class TransportIndexMonitorAction @Inject constructor( ) return } + + if (currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) + .get() + indexDocLevelMonitorQueries(request.monitor, currentMonitor.id) + } actionListener.onResponse( IndexMonitorResponse( response.id, response.version, response.seqNo, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index e0cde7415..caa45df15 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -864,6 +864,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { assertNull("There should not be an error message, but found: $errorMessage", errorMessage) } + @AwaitsFix(bugUrl = "") fun `test create ClusterMetricsInput monitor with alert triggered`() { // GIVEN putAlertMappings() diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt b/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt new file mode 100644 index 000000000..13067bbbf --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt @@ -0,0 +1,35 @@ +package org.opensearch.alerting.core + +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.alerting.core.model.ScheduledJob +import org.opensearch.client.AdminClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings + +class DocLevelMonitorQueries(private val client: AdminClient, private val clusterService: ClusterService) { + companion object { + @JvmStatic + fun docLevelQueriesMappings(): String { + return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() + } + } + + fun initDocLevelQueryIndex(actionListener: ActionListener) { + if (!docLevelQueryIndexExists()) { + var indexRequest = CreateIndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .mapping(docLevelQueriesMappings()) + .settings( + Settings.builder().put("index.hidden", true) + .build() + ) + client.indices().create(indexRequest, actionListener) + } + } + + fun docLevelQueryIndexExists(): Boolean { + val clusterState = clusterService.state() + return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt index 6b132ced6..95e48d7e5 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt @@ -36,6 +36,7 @@ interface ScheduledJob : Writeable, ToXContentObject { companion object { /** The name of the ElasticSearch index in which we store jobs */ const val SCHEDULED_JOBS_INDEX = ".opendistro-alerting-config" + const val DOC_LEVEL_QUERIES_INDEX = ".opendistro-alerting-queries" const val NO_ID = "" diff --git a/core/src/main/resources/mappings/doc-level-queries.json b/core/src/main/resources/mappings/doc-level-queries.json new file mode 100644 index 000000000..3e6145a16 --- /dev/null +++ b/core/src/main/resources/mappings/doc-level-queries.json @@ -0,0 +1,13 @@ +{ + "_meta": { + "schema_version": 5 + }, + "properties": { + "query": { + "type": "percolator_ext" + }, + "monitor_id": { + "type": "text" + } + } +} \ No newline at end of file