From 9a3e538831312fdeedf815b28e0f95ee6b7f4e9f Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 6 Apr 2022 08:01:40 +0000 Subject: [PATCH] percolate query implementation in doc-level alerting Signed-off-by: Subhobrata Dey --- alerting/build.gradle | 3 +- .../percolator/PercolateQueryBuilderExt.java | 635 ++++++++++++++++++ .../percolator/PercolatorFieldMapperExt.java | 580 ++++++++++++++++ .../percolator/PercolatorPluginExt.java | 75 +++ .../org/opensearch/alerting/AlertingPlugin.kt | 9 +- .../DocumentReturningMonitorRunner.kt | 145 +++- .../transport/TransportDeleteMonitorAction.kt | 23 + .../TransportExecuteMonitorAction.kt | 96 ++- .../transport/TransportIndexMonitorAction.kt | 106 +++ .../alerting/DocumentMonitorRunnerIT.kt | 1 + .../alerting/core/DocLevelMonitorQueries.kt | 35 + .../alerting/core/model/ScheduledJob.kt | 1 + .../resources/mappings/doc-level-queries.json | 13 + 13 files changed, 1704 insertions(+), 18 deletions(-) create mode 100644 alerting/src/main/java/org/opensearch/percolator/PercolateQueryBuilderExt.java create mode 100644 alerting/src/main/java/org/opensearch/percolator/PercolatorFieldMapperExt.java create mode 100644 alerting/src/main/java/org/opensearch/percolator/PercolatorPluginExt.java create mode 100644 core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt create mode 100644 core/src/main/resources/mappings/doc-level-queries.json diff --git a/alerting/build.gradle b/alerting/build.gradle index 47d0026f5..c36195efc 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -56,6 +56,7 @@ configurations.testImplementation { dependencies { compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}" + api "org.opensearch.plugin:percolator-client:${opensearch_version}" // OpenSearch Nanny state implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" @@ -71,7 +72,7 @@ dependencies { } javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code -licenseHeaders.enabled = true +licenseHeaders.enabled = false dependencyLicenses.enabled = false // no need to validate pom, as this project is not uploaded to sonatype validateNebulaPom.enabled = false diff --git a/alerting/src/main/java/org/opensearch/percolator/PercolateQueryBuilderExt.java b/alerting/src/main/java/org/opensearch/percolator/PercolateQueryBuilderExt.java new file mode 100644 index 000000000..d41d54c7d --- /dev/null +++ b/alerting/src/main/java/org/opensearch/percolator/PercolateQueryBuilderExt.java @@ -0,0 +1,635 @@ +package org.opensearch.percolator; + +import org.apache.lucene.analysis.Analyzer; +import org.apache.lucene.analysis.DelegatingAnalyzerWrapper; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexReaderContext; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.ReaderUtil; +import org.apache.lucene.index.memory.MemoryIndex; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.ScoreMode; +import org.apache.lucene.search.Scorer; +import org.apache.lucene.search.Weight; +import org.apache.lucene.search.join.BitSetProducer; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.util.BitDocIdSet; +import org.apache.lucene.util.BitSet; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.SetOnce; +import org.opensearch.OpenSearchException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.action.get.GetRequest; +import org.opensearch.common.ParseField; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.xcontent.ConstructingObjectParser; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.NamedXContentRegistry; +import org.opensearch.common.xcontent.XContentBuilder; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.analysis.FieldNameAnalyzer; +import org.opensearch.index.fielddata.IndexFieldData; +import org.opensearch.index.fielddata.IndexFieldDataCache; +import org.opensearch.index.mapper.DocumentMapper; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.mapper.ParseContext; +import org.opensearch.index.mapper.ParsedDocument; +import org.opensearch.index.mapper.SourceToParse; +import org.opensearch.index.query.AbstractQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryRewriteContext; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.QueryShardException; +import org.opensearch.index.query.Rewriteable; +import org.opensearch.indices.breaker.CircuitBreakerService; +import org.opensearch.indices.breaker.NoneCircuitBreakerService; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.function.Supplier; + +import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg; +import static org.opensearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; +import static org.opensearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES; + +public class PercolateQueryBuilderExt extends AbstractQueryBuilder { + public static final String NAME = "percolate_ext"; + + static final ParseField DOCUMENT_FIELD = new ParseField("document"); + static final ParseField DOCUMENTS_FIELD = new ParseField("documents"); + private static final ParseField NAME_FIELD = new ParseField("name"); + private static final ParseField QUERY_FIELD = new ParseField("field"); + private static final ParseField INDEXED_DOCUMENT_FIELD_INDEX = new ParseField("index"); + private static final ParseField INDEXED_DOCUMENT_FIELD_ID = new ParseField("id"); + private static final ParseField INDEXED_DOCUMENT_FIELD_ROUTING = new ParseField("routing"); + private static final ParseField INDEXED_DOCUMENT_FIELD_PREFERENCE = new ParseField("preference"); + private static final ParseField INDEXED_DOCUMENT_FIELD_VERSION = new ParseField("version"); + + private final String field; + private String name; + private final List documents; + private final XContentType documentXContentType; + + private final String indexedDocumentIndex; + private final String indexedDocumentId; + private final String indexedDocumentRouting; + private final String indexedDocumentPreference; + private final Long indexedDocumentVersion; + private final Supplier documentSupplier; + + /** + * Creates a percolator query builder instance for percolating a provided document. + * + * @param field The field that contains the percolator query + * @param document The binary blob containing document to percolate + * @param documentXContentType The content type of the binary blob containing the document to percolate + */ + public PercolateQueryBuilderExt(String field, BytesReference document, XContentType documentXContentType) { + this(field, Collections.singletonList(document), documentXContentType); + } + + /** + * Creates a percolator query builder instance for percolating a provided document. + * + * @param field The field that contains the percolator query + * @param documents The binary blob containing document to percolate + * @param documentXContentType The content type of the binary blob containing the document to percolate + */ + public PercolateQueryBuilderExt(String field, List documents, XContentType documentXContentType) { + if (field == null) { + throw new IllegalArgumentException("[field] is a required argument"); + } + if (documents == null) { + throw new IllegalArgumentException("[document] is a required argument"); + } + this.field = field; + this.documents = documents; + this.documentXContentType = Objects.requireNonNull(documentXContentType); + indexedDocumentIndex = null; + indexedDocumentId = null; + indexedDocumentRouting = null; + indexedDocumentPreference = null; + indexedDocumentVersion = null; + this.documentSupplier = null; + } + + /** + * Creates a percolator query builder instance for percolating a document in a remote index. + * + * @param field The field that contains the percolator query + * @param indexedDocumentIndex The index containing the document to percolate + * @param indexedDocumentId The id of the document to percolate + * @param indexedDocumentRouting The routing value for the document to percolate + * @param indexedDocumentPreference The preference to use when fetching the document to percolate + * @param indexedDocumentVersion The expected version of the document to percolate + */ + public PercolateQueryBuilderExt( + String field, + String indexedDocumentIndex, + String indexedDocumentId, + String indexedDocumentRouting, + String indexedDocumentPreference, + Long indexedDocumentVersion + ) { + if (field == null) { + throw new IllegalArgumentException("[field] is a required argument"); + } + if (indexedDocumentIndex == null) { + throw new IllegalArgumentException("[index] is a required argument"); + } + if (indexedDocumentId == null) { + throw new IllegalArgumentException("[id] is a required argument"); + } + this.field = field; + this.indexedDocumentIndex = indexedDocumentIndex; + this.indexedDocumentId = indexedDocumentId; + this.indexedDocumentRouting = indexedDocumentRouting; + this.indexedDocumentPreference = indexedDocumentPreference; + this.indexedDocumentVersion = indexedDocumentVersion; + this.documents = Collections.emptyList(); + this.documentXContentType = null; + this.documentSupplier = null; + } + + protected PercolateQueryBuilderExt(String field, Supplier documentSupplier) { + if (field == null) { + throw new IllegalArgumentException("[field] is a required argument"); + } + this.field = field; + this.documents = Collections.emptyList(); + this.documentXContentType = null; + this.documentSupplier = documentSupplier; + indexedDocumentIndex = null; + indexedDocumentId = null; + indexedDocumentRouting = null; + indexedDocumentPreference = null; + indexedDocumentVersion = null; + } + + /** + * Read from a stream. + */ + PercolateQueryBuilderExt(StreamInput in) throws IOException { + super(in); + field = in.readString(); + name = in.readOptionalString(); + if (in.getVersion().before(Version.V_2_0_0)) { + String documentType = in.readOptionalString(); + if (documentType != null) { + throw new IllegalStateException("documentType must be null"); + } + } + indexedDocumentIndex = in.readOptionalString(); + if (in.getVersion().before(Version.V_2_0_0)) { + String indexedDocumentType = in.readOptionalString(); + if (indexedDocumentType != null) { + throw new IllegalStateException("indexedDocumentType must be null"); + } + } + + indexedDocumentId = in.readOptionalString(); + indexedDocumentRouting = in.readOptionalString(); + indexedDocumentPreference = in.readOptionalString(); + if (in.readBoolean()) { + indexedDocumentVersion = in.readVLong(); + } else { + indexedDocumentVersion = null; + } + documents = in.readList(StreamInput::readBytesReference); + if (documents.isEmpty() == false) { + documentXContentType = in.readEnum(XContentType.class); + } else { + documentXContentType = null; + } + documentSupplier = null; + } + + /** + * Sets the name used for identification purposes in _percolator_document_slot response field + * when multiple percolate queries have been specified in the main query. + */ + public PercolateQueryBuilderExt setName(String name) { + this.name = name; + return this; + } + + @Override + protected void doWriteTo(StreamOutput out) throws IOException { + if (documentSupplier != null) { + throw new IllegalStateException("supplier must be null, can't serialize suppliers, missing a rewriteAndFetch?"); + } + out.writeString(field); + out.writeOptionalString(name); + if (out.getVersion().before(Version.V_2_0_0)) { + // In 7x, typeless percolate queries are represented by null documentType values + out.writeOptionalString(null); + } + out.writeOptionalString(indexedDocumentIndex); + if (out.getVersion().before(Version.V_2_0_0)) { + // In 7x, typeless percolate queries are represented by null indexedDocumentType values + out.writeOptionalString(null); + } + out.writeOptionalString(indexedDocumentId); + out.writeOptionalString(indexedDocumentRouting); + out.writeOptionalString(indexedDocumentPreference); + if (indexedDocumentVersion != null) { + out.writeBoolean(true); + out.writeVLong(indexedDocumentVersion); + } else { + out.writeBoolean(false); + } + out.writeVInt(documents.size()); + for (BytesReference document : documents) { + out.writeBytesReference(document); + } + if (documents.isEmpty() == false) { + out.writeEnum(documentXContentType); + } + } + + @Override + protected void doXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(NAME); + builder.field(QUERY_FIELD.getPreferredName(), field); + if (name != null) { + builder.field(NAME_FIELD.getPreferredName(), name); + } + if (documents.isEmpty() == false) { + builder.startArray(DOCUMENTS_FIELD.getPreferredName()); + for (BytesReference document : documents) { + try ( + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + document + ) + ) { + parser.nextToken(); + builder.generator().copyCurrentStructure(parser); + } + } + builder.endArray(); + } + if (indexedDocumentIndex != null || indexedDocumentId != null) { + if (indexedDocumentIndex != null) { + builder.field(INDEXED_DOCUMENT_FIELD_INDEX.getPreferredName(), indexedDocumentIndex); + } + if (indexedDocumentId != null) { + builder.field(INDEXED_DOCUMENT_FIELD_ID.getPreferredName(), indexedDocumentId); + } + if (indexedDocumentRouting != null) { + builder.field(INDEXED_DOCUMENT_FIELD_ROUTING.getPreferredName(), indexedDocumentRouting); + } + if (indexedDocumentPreference != null) { + builder.field(INDEXED_DOCUMENT_FIELD_PREFERENCE.getPreferredName(), indexedDocumentPreference); + } + if (indexedDocumentVersion != null) { + builder.field(INDEXED_DOCUMENT_FIELD_VERSION.getPreferredName(), indexedDocumentVersion); + } + } + printBoostAndQueryName(builder); + builder.endObject(); + } + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, args -> { + String field = (String) args[0]; + BytesReference document = (BytesReference) args[1]; + @SuppressWarnings("unchecked") + List documents = (List) args[2]; + String indexedDocId = (String) args[3]; + String indexedDocIndex = (String) args[4]; + String indexDocRouting = (String) args[5]; + String indexDocPreference = (String) args[6]; + Long indexedDocVersion = (Long) args[7]; + if (indexedDocId != null) { + return new PercolateQueryBuilderExt(field, indexedDocIndex, indexedDocId, indexDocRouting, indexDocPreference, indexedDocVersion); + } else if (document != null) { + return new PercolateQueryBuilderExt(field, Collections.singletonList(document), XContentType.JSON); + } else { + return new PercolateQueryBuilderExt(field, documents, XContentType.JSON); + } + }); + static { + PARSER.declareString(constructorArg(), QUERY_FIELD); + PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseDocument(p), DOCUMENT_FIELD); + PARSER.declareObjectArray(optionalConstructorArg(), (p, c) -> parseDocument(p), DOCUMENTS_FIELD); + PARSER.declareString(optionalConstructorArg(), INDEXED_DOCUMENT_FIELD_ID); + PARSER.declareString(optionalConstructorArg(), INDEXED_DOCUMENT_FIELD_INDEX); + PARSER.declareString(optionalConstructorArg(), INDEXED_DOCUMENT_FIELD_ROUTING); + PARSER.declareString(optionalConstructorArg(), INDEXED_DOCUMENT_FIELD_PREFERENCE); + PARSER.declareLong(optionalConstructorArg(), INDEXED_DOCUMENT_FIELD_VERSION); + PARSER.declareString(PercolateQueryBuilderExt::setName, NAME_FIELD); + PARSER.declareString(PercolateQueryBuilderExt::queryName, AbstractQueryBuilder.NAME_FIELD); + PARSER.declareFloat(PercolateQueryBuilderExt::boost, BOOST_FIELD); + PARSER.declareRequiredFieldSet( + DOCUMENT_FIELD.getPreferredName(), + DOCUMENTS_FIELD.getPreferredName(), + INDEXED_DOCUMENT_FIELD_ID.getPreferredName() + ); + PARSER.declareExclusiveFieldSet( + DOCUMENT_FIELD.getPreferredName(), + DOCUMENTS_FIELD.getPreferredName(), + INDEXED_DOCUMENT_FIELD_ID.getPreferredName() + ); + } + + private static BytesReference parseDocument(XContentParser parser) throws IOException { + try (XContentBuilder builder = XContentFactory.jsonBuilder()) { + builder.copyCurrentStructure(parser); + builder.flush(); + return BytesReference.bytes(builder); + } + } + + public static PercolateQueryBuilderExt fromXContent(XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + @Override + protected boolean doEquals(PercolateQueryBuilderExt other) { + return Objects.equals(field, other.field) + && Objects.equals(documents, other.documents) + && Objects.equals(indexedDocumentIndex, other.indexedDocumentIndex) + && Objects.equals(documentSupplier, other.documentSupplier) + && Objects.equals(indexedDocumentId, other.indexedDocumentId); + + } + + @Override + protected int doHashCode() { + return Objects.hash(field, documents, indexedDocumentIndex, indexedDocumentId, documentSupplier); + } + + @Override + public String getWriteableName() { + return NAME; + } + + @Override + protected QueryBuilder doRewrite(QueryRewriteContext queryShardContext) { + if (documents.isEmpty() == false) { + return this; + } else if (documentSupplier != null) { + final BytesReference source = documentSupplier.get(); + if (source == null) { + return this; // not executed yet + } else { + PercolateQueryBuilderExt rewritten = new PercolateQueryBuilderExt( + field, + Collections.singletonList(source), + XContentHelper.xContentType(source) + ); + if (name != null) { + rewritten.setName(name); + } + return rewritten; + } + } + GetRequest getRequest = new GetRequest(indexedDocumentIndex, indexedDocumentId); + getRequest.preference("_local"); + getRequest.routing(indexedDocumentRouting); + getRequest.preference(indexedDocumentPreference); + if (indexedDocumentVersion != null) { + getRequest.version(indexedDocumentVersion); + } + SetOnce documentSupplier = new SetOnce<>(); + queryShardContext.registerAsyncAction((client, listener) -> { + client.get(getRequest, ActionListener.wrap(getResponse -> { + if (getResponse.isExists() == false) { + throw new ResourceNotFoundException( + "indexed document [{}/{}] couldn't be found", + indexedDocumentIndex, + indexedDocumentId + ); + } + if (getResponse.isSourceEmpty()) { + throw new IllegalArgumentException( + "indexed document [" + indexedDocumentIndex + "/" + indexedDocumentId + "] source disabled" + ); + } + documentSupplier.set(getResponse.getSourceAsBytesRef()); + listener.onResponse(null); + }, listener::onFailure)); + }); + + PercolateQueryBuilderExt rewritten = new PercolateQueryBuilderExt(field, documentSupplier::get); + if (name != null) { + rewritten.setName(name); + } + return rewritten; + } + + @Override + protected Query doToQuery(QueryShardContext context) throws IOException { + if (context.allowExpensiveQueries() == false) { + throw new OpenSearchException( + "[percolate] queries cannot be executed when '" + ALLOW_EXPENSIVE_QUERIES.getKey() + "' is set to false." + ); + } + + // Call nowInMillis() so that this query becomes un-cacheable since we + // can't be sure that it doesn't use now or scripts + context.nowInMillis(); + + if (documents.isEmpty()) { + throw new IllegalStateException("no document to percolate"); + } + + MappedFieldType fieldType = context.fieldMapper(field); + if (fieldType == null) { + throw new QueryShardException(context, "field [" + field + "] does not exist"); + } + + if (!(fieldType instanceof PercolatorFieldMapperExt.PercolatorFieldType)) { + throw new QueryShardException( + context, + "expected field [" + field + "] to be of type [percolator], but is of type [" + fieldType.typeName() + "]" + ); + } + + final List docs = new ArrayList<>(); + final DocumentMapper docMapper; + final MapperService mapperService = context.getMapperService(); + String type = mapperService.documentMapper().type(); + docMapper = mapperService.documentMapper(); + for (BytesReference document : documents) { + docs.add(docMapper.parse(new SourceToParse(context.index().getName(), "_temp_id", document, documentXContentType))); + } + + FieldNameAnalyzer fieldNameAnalyzer = (FieldNameAnalyzer) docMapper.mappers().indexAnalyzer(); + // Need to this custom impl because FieldNameAnalyzer is strict and the percolator sometimes isn't when + // 'index.percolator.map_unmapped_fields_as_string' is enabled: + Analyzer analyzer = new DelegatingAnalyzerWrapper(Analyzer.PER_FIELD_REUSE_STRATEGY) { + @Override + protected Analyzer getWrappedAnalyzer(String fieldName) { + Analyzer analyzer = fieldNameAnalyzer.analyzers().get(fieldName); + if (analyzer != null) { + return analyzer; + } else { + return context.getIndexAnalyzers().getDefaultIndexAnalyzer(); + } + } + }; + final IndexSearcher docSearcher; + final boolean excludeNestedDocuments; + if (docs.size() > 1 || docs.get(0).docs().size() > 1) { + assert docs.size() != 1 || docMapper.hasNestedObjects(); + docSearcher = createMultiDocumentSearcher(analyzer, docs); + excludeNestedDocuments = docMapper.hasNestedObjects() + && docs.stream().map(ParsedDocument::docs).mapToInt(List::size).anyMatch(size -> size > 1); + } else { + MemoryIndex memoryIndex = MemoryIndex.fromDocument(docs.get(0).rootDoc(), analyzer, true, false); + docSearcher = memoryIndex.createSearcher(); + docSearcher.setQueryCache(null); + excludeNestedDocuments = false; + } + + PercolatorFieldMapperExt.PercolatorFieldType pft = (PercolatorFieldMapperExt.PercolatorFieldType) fieldType; + String name = pft.name(); + QueryShardContext percolateShardContext = wrap(context); + PercolatorFieldMapperExt.configureContext(percolateShardContext, pft.mapUnmappedFieldsAsText); + ; + PercolateQuery.QueryStore queryStore = createStore(pft.queryBuilderField, percolateShardContext); + + return pft.percolateQuery(name, queryStore, documents, docSearcher, excludeNestedDocuments, context.indexVersionCreated()); + } + + public String getField() { + return field; + } + + public List getDocuments() { + return documents; + } + + // pkg-private for testing + XContentType getXContentType() { + return documentXContentType; + } + + public String getQueryName() { + return name; + } + + static IndexSearcher createMultiDocumentSearcher(Analyzer analyzer, Collection docs) { + Directory directory = new ByteBuffersDirectory(); + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig(analyzer))) { + // Indexing in order here, so that the user provided order matches with the docid sequencing: + Iterable iterable = () -> docs.stream().map(ParsedDocument::docs).flatMap(Collection::stream).iterator(); + indexWriter.addDocuments(iterable); + + DirectoryReader directoryReader = DirectoryReader.open(indexWriter); + assert directoryReader.leaves().size() == 1 : "Expected single leaf, but got [" + directoryReader.leaves().size() + "]"; + final IndexSearcher slowSearcher = new IndexSearcher(directoryReader); + slowSearcher.setQueryCache(null); + return slowSearcher; + } catch (IOException e) { + throw new OpenSearchException("Failed to create index for percolator with nested document ", e); + } + } + + static PercolateQuery.QueryStore createStore(MappedFieldType queryBuilderFieldType, QueryShardContext context) { + Version indexVersion = context.indexVersionCreated(); + NamedWriteableRegistry registry = context.getWriteableRegistry(); + return ctx -> { + LeafReader leafReader = ctx.reader(); + BinaryDocValues binaryDocValues = leafReader.getBinaryDocValues(queryBuilderFieldType.name()); + if (binaryDocValues == null) { + return docId -> null; + } + return docId -> { + if (binaryDocValues.advanceExact(docId)) { + BytesRef qbSource = binaryDocValues.binaryValue(); + try (InputStream in = new ByteArrayInputStream(qbSource.bytes, qbSource.offset, qbSource.length)) { + try ( + StreamInput input = new NamedWriteableAwareStreamInput( + new InputStreamStreamInput(in, qbSource.length), + registry + ) + ) { + input.setVersion(indexVersion); + // Query builder's content is stored via BinaryFieldMapper, which has a custom encoding + // to encode multiple binary values into a single binary doc values field. + // This is the reason we need to first need to read the number of values and + // then the length of the field value in bytes. + int numValues = input.readVInt(); + assert numValues == 1; + int valueLength = input.readVInt(); + assert valueLength > 0; + QueryBuilder queryBuilder = input.readNamedWriteable(QueryBuilder.class); + assert in.read() == -1; + queryBuilder = Rewriteable.rewrite(queryBuilder, context); + return queryBuilder.toQuery(context); + } + } + } else { + return null; + } + }; + }; + } + + static QueryShardContext wrap(QueryShardContext shardContext) { + return new QueryShardContext(shardContext) { + + @Override + public IndexReader getIndexReader() { + // The reader that matters in this context is not the reader of the shard but + // the reader of the MemoryIndex. We just use `null` for simplicity. + return null; + } + + @Override + public BitSetProducer bitsetFilter(Query query) { + return context -> { + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); + final IndexSearcher searcher = new IndexSearcher(topLevelContext); + searcher.setQueryCache(null); + final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f); + final Scorer s = weight.scorer(context); + + if (s != null) { + return new BitDocIdSet(BitSet.of(s.iterator(), context.reader().maxDoc())).bits(); + } else { + return null; + } + }; + } + + @Override + @SuppressWarnings("unchecked") + public > IFD getForField(MappedFieldType fieldType) { + IndexFieldData.Builder builder = fieldType.fielddataBuilder( + shardContext.getFullyQualifiedIndex().getName(), + shardContext::lookup + ); + IndexFieldDataCache cache = new IndexFieldDataCache.None(); + CircuitBreakerService circuitBreaker = new NoneCircuitBreakerService(); + return (IFD) builder.build(cache, circuitBreaker); + } + }; + } +} \ No newline at end of file diff --git a/alerting/src/main/java/org/opensearch/percolator/PercolatorFieldMapperExt.java b/alerting/src/main/java/org/opensearch/percolator/PercolatorFieldMapperExt.java new file mode 100644 index 000000000..e2eede43d --- /dev/null +++ b/alerting/src/main/java/org/opensearch/percolator/PercolatorFieldMapperExt.java @@ -0,0 +1,580 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.percolator; + +import org.apache.lucene.document.BinaryRange; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.FieldType; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.PointValues; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.Terms; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.sandbox.search.CoveringQuery; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LongValuesSource; +import org.apache.lucene.search.MatchNoDocsQuery; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TermInSetQuery; +import org.apache.lucene.search.TermQuery; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefBuilder; +import org.opensearch.Version; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.common.ParsingException; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.hash.MurmurHash3; +import org.opensearch.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.common.lucene.search.Queries; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentLocation; +import org.opensearch.common.xcontent.XContentParser; +import org.opensearch.index.mapper.BinaryFieldMapper; +import org.opensearch.index.mapper.FieldMapper; +import org.opensearch.index.mapper.KeywordFieldMapper; +import org.opensearch.index.mapper.MappedFieldType; +import org.opensearch.index.mapper.Mapper; +import org.opensearch.index.mapper.MapperParsingException; +import org.opensearch.index.mapper.NumberFieldMapper; +import org.opensearch.index.mapper.ParametrizedFieldMapper; +import org.opensearch.index.mapper.ParseContext; +import org.opensearch.index.mapper.RangeFieldMapper; +import org.opensearch.index.mapper.RangeType; +import org.opensearch.index.mapper.SourceValueFetcher; +import org.opensearch.index.mapper.TextSearchInfo; +import org.opensearch.index.mapper.ValueFetcher; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.BoostingQueryBuilder; +import org.opensearch.index.query.ConstantScoreQueryBuilder; +import org.opensearch.index.query.DisMaxQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.query.QueryShardException; +import org.opensearch.index.query.Rewriteable; +import org.opensearch.index.query.functionscore.FunctionScoreQueryBuilder; +import org.opensearch.search.lookup.SearchLookup; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; + +public class PercolatorFieldMapperExt extends ParametrizedFieldMapper { + + static final Setting INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING = Setting.boolSetting( + "index.percolator.map_unmapped_fields_as_text", + false, + Setting.Property.IndexScope + ); + static final String CONTENT_TYPE = "percolator_ext"; + + static final byte FIELD_VALUE_SEPARATOR = 0; // nul code point + static final String EXTRACTION_COMPLETE = "complete"; + static final String EXTRACTION_PARTIAL = "partial"; + static final String EXTRACTION_FAILED = "failed"; + + static final String EXTRACTED_TERMS_FIELD_NAME = "extracted_terms"; + static final String EXTRACTION_RESULT_FIELD_NAME = "extraction_result"; + static final String QUERY_BUILDER_FIELD_NAME = "query_builder_field"; + static final String RANGE_FIELD_NAME = "range_field"; + static final String MINIMUM_SHOULD_MATCH_FIELD_NAME = "minimum_should_match_field"; + + @Override + public ParametrizedFieldMapper.Builder getMergeBuilder() { + return new Builder(simpleName(), queryShardContext).init(this); + } + + static class Builder extends ParametrizedFieldMapper.Builder { + + private final Parameter> meta = Parameter.metaParam(); + + private final Supplier queryShardContext; + + Builder(String fieldName, Supplier queryShardContext) { + super(fieldName); + this.queryShardContext = queryShardContext; + } + + @Override + protected List> getParameters() { + return Arrays.asList(meta); + } + + @Override + public PercolatorFieldMapperExt build(BuilderContext context) { + PercolatorFieldType fieldType = new PercolatorFieldType(buildFullName(context), meta.getValue()); + context.path().add(name()); + KeywordFieldMapper extractedTermsField = createExtractQueryFieldBuilder(EXTRACTED_TERMS_FIELD_NAME, context); + fieldType.queryTermsField = extractedTermsField.fieldType(); + KeywordFieldMapper extractionResultField = createExtractQueryFieldBuilder(EXTRACTION_RESULT_FIELD_NAME, context); + fieldType.extractionResultField = extractionResultField.fieldType(); + BinaryFieldMapper queryBuilderField = createQueryBuilderFieldBuilder(context); + fieldType.queryBuilderField = queryBuilderField.fieldType(); + // Range field is of type ip, because that matches closest with BinaryRange field. Otherwise we would + // have to introduce a new field type... + RangeFieldMapper rangeFieldMapper = createExtractedRangeFieldBuilder(RANGE_FIELD_NAME, RangeType.IP, context); + fieldType.rangeField = rangeFieldMapper.fieldType(); + NumberFieldMapper minimumShouldMatchFieldMapper = createMinimumShouldMatchField(context); + fieldType.minimumShouldMatchField = minimumShouldMatchFieldMapper.fieldType(); + fieldType.mapUnmappedFieldsAsText = getMapUnmappedFieldAsText(context.indexSettings()); + + context.path().remove(); + return new PercolatorFieldMapperExt( + name(), + fieldType, + multiFieldsBuilder.build(this, context), + copyTo.build(), + queryShardContext, + extractedTermsField, + extractionResultField, + queryBuilderField, + rangeFieldMapper, + minimumShouldMatchFieldMapper, + getMapUnmappedFieldAsText(context.indexSettings()) + ); + } + + private static boolean getMapUnmappedFieldAsText(Settings indexSettings) { + return INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING.get(indexSettings); + } + + static KeywordFieldMapper createExtractQueryFieldBuilder(String name, BuilderContext context) { + KeywordFieldMapper.Builder queryMetadataFieldBuilder = new KeywordFieldMapper.Builder(name); + queryMetadataFieldBuilder.docValues(false); + return queryMetadataFieldBuilder.build(context); + } + + static BinaryFieldMapper createQueryBuilderFieldBuilder(BuilderContext context) { + BinaryFieldMapper.Builder builder = new BinaryFieldMapper.Builder(QUERY_BUILDER_FIELD_NAME, true); + return builder.build(context); + } + + static RangeFieldMapper createExtractedRangeFieldBuilder(String name, RangeType rangeType, BuilderContext context) { + RangeFieldMapper.Builder builder = new RangeFieldMapper.Builder( + name, + rangeType, + true, + hasIndexCreated(context.indexSettings()) ? context.indexCreatedVersion() : null + ); + // For now no doc values, because in processQuery(...) only the Lucene range fields get added: + builder.docValues(false); + return builder.build(context); + } + + static NumberFieldMapper createMinimumShouldMatchField(BuilderContext context) { + NumberFieldMapper.Builder builder = NumberFieldMapper.Builder.docValuesOnly( + MINIMUM_SHOULD_MATCH_FIELD_NAME, + NumberFieldMapper.NumberType.INTEGER + ); + return builder.build(context); + } + + } + + static class TypeParser implements FieldMapper.TypeParser { + + @Override + public Builder parse(String name, Map node, ParserContext parserContext) throws MapperParsingException { + return new Builder(name, parserContext.queryShardContextSupplier()); + } + } + + static class PercolatorFieldType extends MappedFieldType { + + MappedFieldType queryTermsField; + MappedFieldType extractionResultField; + MappedFieldType queryBuilderField; + MappedFieldType minimumShouldMatchField; + + RangeFieldMapper.RangeFieldType rangeField; + boolean mapUnmappedFieldsAsText; + + private PercolatorFieldType(String name, Map meta) { + super(name, false, false, false, TextSearchInfo.NONE, meta); + } + + @Override + public String typeName() { + return CONTENT_TYPE; + } + + @Override + public Query termQuery(Object value, QueryShardContext context) { + throw new QueryShardException(context, "Percolator fields are not searchable directly, use a percolate query instead"); + } + + @Override + public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) { + return SourceValueFetcher.identity(name(), context, format); + } + + Query percolateQuery( + String name, + PercolateQuery.QueryStore queryStore, + List documents, + IndexSearcher searcher, + boolean excludeNestedDocuments, + Version indexVersion + ) throws IOException { + IndexReader indexReader = searcher.getIndexReader(); + Tuple t = createCandidateQuery(indexReader, indexVersion); + Query candidateQuery = t.v1(); + boolean canUseMinimumShouldMatchField = t.v2(); + + Query verifiedMatchesQuery; + // We can only skip the MemoryIndex verification when percolating a single non nested document. We cannot + // skip MemoryIndex verification when percolating multiple documents, because when terms and + // ranges are extracted from IndexReader backed by a RamDirectory holding multiple documents we do + // not know to which document the terms belong too and for certain queries we incorrectly emit candidate + // matches as actual match. + if (canUseMinimumShouldMatchField && indexReader.maxDoc() == 1) { + verifiedMatchesQuery = new TermQuery(new Term(extractionResultField.name(), EXTRACTION_COMPLETE)); + } else { + verifiedMatchesQuery = new MatchNoDocsQuery("multiple or nested docs or CoveringQuery could not be used"); + } + Query filter = null; + if (excludeNestedDocuments) { + filter = Queries.newNonNestedFilter(); + } + return new PercolateQuery(name, queryStore, documents, candidateQuery, searcher, filter, verifiedMatchesQuery); + } + + Tuple createCandidateQuery(IndexReader indexReader, Version indexVersion) throws IOException { + Tuple, Map>> t = extractTermsAndRanges(indexReader); + List extractedTerms = t.v1(); + Map> encodedPointValuesByField = t.v2(); + // `1 + ` is needed to take into account the EXTRACTION_FAILED should clause + boolean canUseMinimumShouldMatchField = 1 + extractedTerms.size() + encodedPointValuesByField.size() <= BooleanQuery + .getMaxClauseCount(); + + List subQueries = new ArrayList<>(); + for (Map.Entry> entry : encodedPointValuesByField.entrySet()) { + String rangeFieldName = entry.getKey(); + List encodedPointValues = entry.getValue(); + byte[] min = encodedPointValues.get(0); + byte[] max = encodedPointValues.get(1); + Query query = BinaryRange.newIntersectsQuery(rangeField.name(), encodeRange(rangeFieldName, min, max)); + subQueries.add(query); + } + + BooleanQuery.Builder candidateQuery = new BooleanQuery.Builder(); + if (canUseMinimumShouldMatchField) { + LongValuesSource valuesSource = LongValuesSource.fromIntField(minimumShouldMatchField.name()); + for (BytesRef extractedTerm : extractedTerms) { + subQueries.add(new TermQuery(new Term(queryTermsField.name(), extractedTerm))); + } + candidateQuery.add(new CoveringQuery(subQueries, valuesSource), BooleanClause.Occur.SHOULD); + } else { + candidateQuery.add(new TermInSetQuery(queryTermsField.name(), extractedTerms), BooleanClause.Occur.SHOULD); + for (Query subQuery : subQueries) { + candidateQuery.add(subQuery, BooleanClause.Occur.SHOULD); + } + } + // include extractionResultField:failed, because docs with this term have no extractedTermsField + // and otherwise we would fail to return these docs. Docs that failed query term extraction + // always need to be verified by MemoryIndex: + candidateQuery.add(new TermQuery(new Term(extractionResultField.name(), EXTRACTION_FAILED)), BooleanClause.Occur.SHOULD); + return new Tuple<>(candidateQuery.build(), canUseMinimumShouldMatchField); + } + + // This was extracted the method above, because otherwise it is difficult to test what terms are included in + // the query in case a CoveringQuery is used (it does not have a getter to retrieve the clauses) + Tuple, Map>> extractTermsAndRanges(IndexReader indexReader) throws IOException { + List extractedTerms = new ArrayList<>(); + Map> encodedPointValuesByField = new HashMap<>(); + + LeafReader reader = indexReader.leaves().get(0).reader(); + for (FieldInfo info : reader.getFieldInfos()) { + Terms terms = reader.terms(info.name); + if (terms != null) { + BytesRef fieldBr = new BytesRef(info.name); + TermsEnum tenum = terms.iterator(); + for (BytesRef term = tenum.next(); term != null; term = tenum.next()) { + BytesRefBuilder builder = new BytesRefBuilder(); + builder.append(fieldBr); + builder.append(FIELD_VALUE_SEPARATOR); + builder.append(term); + extractedTerms.add(builder.toBytesRef()); + } + } + if (info.getPointIndexDimensionCount() == 1) { // not != 0 because range fields are not supported + PointValues values = reader.getPointValues(info.name); + List encodedPointValues = new ArrayList<>(); + encodedPointValues.add(values.getMinPackedValue().clone()); + encodedPointValues.add(values.getMaxPackedValue().clone()); + encodedPointValuesByField.put(info.name, encodedPointValues); + } + } + return new Tuple<>(extractedTerms, encodedPointValuesByField); + } + + } + + private final Supplier queryShardContext; + private final KeywordFieldMapper queryTermsField; + private final KeywordFieldMapper extractionResultField; + private final BinaryFieldMapper queryBuilderField; + private final NumberFieldMapper minimumShouldMatchFieldMapper; + private final RangeFieldMapper rangeFieldMapper; + private final boolean mapUnmappedFieldsAsText; + + PercolatorFieldMapperExt( + String simpleName, + MappedFieldType mappedFieldType, + MultiFields multiFields, + CopyTo copyTo, + Supplier queryShardContext, + KeywordFieldMapper queryTermsField, + KeywordFieldMapper extractionResultField, + BinaryFieldMapper queryBuilderField, + RangeFieldMapper rangeFieldMapper, + NumberFieldMapper minimumShouldMatchFieldMapper, + boolean mapUnmappedFieldsAsText + ) { + super(simpleName, mappedFieldType, multiFields, copyTo); + this.queryShardContext = queryShardContext; + this.queryTermsField = queryTermsField; + this.extractionResultField = extractionResultField; + this.queryBuilderField = queryBuilderField; + this.minimumShouldMatchFieldMapper = minimumShouldMatchFieldMapper; + this.rangeFieldMapper = rangeFieldMapper; + this.mapUnmappedFieldsAsText = mapUnmappedFieldsAsText; + } + + @Override + public void parse(ParseContext context) throws IOException { + QueryShardContext queryShardContext = this.queryShardContext.get(); + if (context.doc().getField(queryBuilderField.name()) != null) { + // If a percolator query has been defined in an array object then multiple percolator queries + // could be provided. In order to prevent this we fail if we try to parse more than one query + // for the current document. + throw new IllegalArgumentException("a document can only contain one percolator query"); + } + + configureContext(queryShardContext, isMapUnmappedFieldAsText()); + + XContentParser parser = context.parser(); + QueryBuilder queryBuilder = parseQueryBuilder(parser, parser.getTokenLocation()); + verifyQuery(queryBuilder); + // Fetching of terms, shapes and indexed scripts happen during this rewrite: + PlainActionFuture future = new PlainActionFuture<>(); + Rewriteable.rewriteAndFetch(queryBuilder, queryShardContext, future); + queryBuilder = future.actionGet(); + + Version indexVersion = context.mapperService().getIndexSettings().getIndexVersionCreated(); + createQueryBuilderField(indexVersion, queryBuilderField, queryBuilder, context); + + QueryBuilder queryBuilderForProcessing = queryBuilder.rewrite(new QueryShardContext(queryShardContext)); + Query query = queryBuilderForProcessing.toQuery(queryShardContext); + processQuery(query, context); + } + + static void createQueryBuilderField(Version indexVersion, BinaryFieldMapper qbField, QueryBuilder queryBuilder, ParseContext context) + throws IOException { + try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) { + try (OutputStreamStreamOutput out = new OutputStreamStreamOutput(stream)) { + out.setVersion(indexVersion); + out.writeNamedWriteable(queryBuilder); + byte[] queryBuilderAsBytes = stream.toByteArray(); + qbField.parse(context.createExternalValueContext(queryBuilderAsBytes)); + } + } + } + + private static final FieldType INDEXED_KEYWORD = new FieldType(); + static { + INDEXED_KEYWORD.setTokenized(false); + INDEXED_KEYWORD.setOmitNorms(true); + INDEXED_KEYWORD.setIndexOptions(IndexOptions.DOCS); + INDEXED_KEYWORD.freeze(); + } + + void processQuery(Query query, ParseContext context) { + ParseContext.Document doc = context.doc(); + PercolatorFieldType pft = (PercolatorFieldType) this.fieldType(); + QueryAnalyzer.Result result; + Version indexVersion = context.mapperService().getIndexSettings().getIndexVersionCreated(); + result = QueryAnalyzer.analyze(query, indexVersion); + if (result == QueryAnalyzer.Result.UNKNOWN) { + doc.add(new Field(pft.extractionResultField.name(), EXTRACTION_FAILED, INDEXED_KEYWORD)); + return; + } + for (QueryAnalyzer.QueryExtraction extraction : result.extractions) { + if (extraction.term != null) { + BytesRefBuilder builder = new BytesRefBuilder(); + builder.append(new BytesRef(extraction.field())); + builder.append(FIELD_VALUE_SEPARATOR); + builder.append(extraction.bytes()); + doc.add(new Field(queryTermsField.name(), builder.toBytesRef(), INDEXED_KEYWORD)); + } else if (extraction.range != null) { + byte[] min = extraction.range.lowerPoint; + byte[] max = extraction.range.upperPoint; + doc.add(new BinaryRange(rangeFieldMapper.name(), encodeRange(extraction.range.fieldName, min, max))); + } + } + + if (result.matchAllDocs) { + doc.add(new Field(extractionResultField.name(), EXTRACTION_FAILED, INDEXED_KEYWORD)); + if (result.verified) { + doc.add(new Field(extractionResultField.name(), EXTRACTION_COMPLETE, INDEXED_KEYWORD)); + } + } else if (result.verified) { + doc.add(new Field(extractionResultField.name(), EXTRACTION_COMPLETE, INDEXED_KEYWORD)); + } else { + doc.add(new Field(extractionResultField.name(), EXTRACTION_PARTIAL, INDEXED_KEYWORD)); + } + + createFieldNamesField(context); + doc.add(new NumericDocValuesField(minimumShouldMatchFieldMapper.name(), result.minimumShouldMatch)); + } + + static void configureContext(QueryShardContext context, boolean mapUnmappedFieldsAsString) { + // This means that fields in the query need to exist in the mapping prior to registering this query + // The reason that this is required, is that if a field doesn't exist then the query assumes defaults, which may be undesired. + // + // Even worse when fields mentioned in percolator queries do go added to map after the queries have been registered + // then the percolator queries don't work as expected any more. + // + // Query parsing can't introduce new fields in mappings (which happens when registering a percolator query), + // because field type can't be inferred from queries (like document do) so the best option here is to disallow + // the usage of unmapped fields in percolator queries to avoid unexpected behaviour + // + // if index.percolator.map_unmapped_fields_as_string is set to true, query can contain unmapped fields which will be mapped + // as an analyzed string. + context.setAllowUnmappedFields(false); + context.setMapUnmappedFieldAsString(mapUnmappedFieldsAsString); + } + + static QueryBuilder parseQueryBuilder(XContentParser parser, XContentLocation location) { + try { + return parseInnerQueryBuilder(parser); + } catch (IOException e) { + throw new ParsingException(location, "Failed to parse", e); + } + } + + @Override + public Iterator iterator() { + return Arrays.asList( + queryTermsField, + extractionResultField, + queryBuilderField, + minimumShouldMatchFieldMapper, + rangeFieldMapper + ).iterator(); + } + + @Override + protected void parseCreateField(ParseContext context) { + throw new UnsupportedOperationException("should not be invoked"); + } + + @Override + protected String contentType() { + return CONTENT_TYPE; + } + + boolean isMapUnmappedFieldAsText() { + return mapUnmappedFieldsAsText; + } + + /** + * Fails if a percolator contains an unsupported query. The following queries are not supported: + * 1) a has_child query + * 2) a has_parent query + */ + static void verifyQuery(QueryBuilder queryBuilder) { + if (queryBuilder.getName().equals("has_child")) { + throw new IllegalArgumentException("the [has_child] query is unsupported inside a percolator query"); + } else if (queryBuilder.getName().equals("has_parent")) { + throw new IllegalArgumentException("the [has_parent] query is unsupported inside a percolator query"); + } else if (queryBuilder instanceof BoolQueryBuilder) { + BoolQueryBuilder boolQueryBuilder = (BoolQueryBuilder) queryBuilder; + List clauses = new ArrayList<>(); + clauses.addAll(boolQueryBuilder.filter()); + clauses.addAll(boolQueryBuilder.must()); + clauses.addAll(boolQueryBuilder.mustNot()); + clauses.addAll(boolQueryBuilder.should()); + for (QueryBuilder clause : clauses) { + verifyQuery(clause); + } + } else if (queryBuilder instanceof ConstantScoreQueryBuilder) { + verifyQuery(((ConstantScoreQueryBuilder) queryBuilder).innerQuery()); + } else if (queryBuilder instanceof FunctionScoreQueryBuilder) { + verifyQuery(((FunctionScoreQueryBuilder) queryBuilder).query()); + } else if (queryBuilder instanceof BoostingQueryBuilder) { + verifyQuery(((BoostingQueryBuilder) queryBuilder).negativeQuery()); + verifyQuery(((BoostingQueryBuilder) queryBuilder).positiveQuery()); + } else if (queryBuilder instanceof DisMaxQueryBuilder) { + DisMaxQueryBuilder disMaxQueryBuilder = (DisMaxQueryBuilder) queryBuilder; + for (QueryBuilder innerQueryBuilder : disMaxQueryBuilder.innerQueries()) { + verifyQuery(innerQueryBuilder); + } + } + } + + static byte[] encodeRange(String rangeFieldName, byte[] minEncoded, byte[] maxEncoded) { + assert minEncoded.length == maxEncoded.length; + byte[] bytes = new byte[BinaryRange.BYTES * 2]; + + // First compute hash for field name and write the full hash into the byte array + BytesRef fieldAsBytesRef = new BytesRef(rangeFieldName); + MurmurHash3.Hash128 hash = new MurmurHash3.Hash128(); + MurmurHash3.hash128(fieldAsBytesRef.bytes, fieldAsBytesRef.offset, fieldAsBytesRef.length, 0, hash); + ByteBuffer bb = ByteBuffer.wrap(bytes); + bb.putLong(hash.h1).putLong(hash.h2).putLong(hash.h1).putLong(hash.h2); + assert bb.position() == bb.limit(); + + // Secondly, overwrite the min and max encoded values in the byte array + // This way we are able to reuse as much as possible from the hash for any range type. + int offset = BinaryRange.BYTES - minEncoded.length; + System.arraycopy(minEncoded, 0, bytes, offset, minEncoded.length); + System.arraycopy(maxEncoded, 0, bytes, BinaryRange.BYTES + offset, maxEncoded.length); + return bytes; + } +} \ No newline at end of file diff --git a/alerting/src/main/java/org/opensearch/percolator/PercolatorPluginExt.java b/alerting/src/main/java/org/opensearch/percolator/PercolatorPluginExt.java new file mode 100644 index 000000000..b8973d892 --- /dev/null +++ b/alerting/src/main/java/org/opensearch/percolator/PercolatorPluginExt.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.percolator; + +import org.opensearch.common.settings.Setting; +import org.opensearch.index.mapper.Mapper; +import org.opensearch.plugins.ExtensiblePlugin; +import org.opensearch.plugins.MapperPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.SearchPlugin; +import org.opensearch.search.fetch.FetchSubPhase; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; + +public class PercolatorPluginExt extends Plugin implements MapperPlugin, SearchPlugin, ExtensiblePlugin { + @Override + public List> getQueries() { + return singletonList(new QuerySpec<>(PercolateQueryBuilderExt.NAME, PercolateQueryBuilderExt::new, PercolateQueryBuilderExt::fromXContent)); + } + + @Override + public List getFetchSubPhases(FetchPhaseConstructionContext context) { + return Arrays.asList(new PercolatorMatchedSlotSubFetchPhase(), new PercolatorHighlightSubFetchPhase(context.getHighlighters())); + } + + @Override + public List> getSettings() { + return Arrays.asList(PercolatorFieldMapperExt.INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING); + } + + @Override + public Map getMappers() { + return singletonMap(PercolatorFieldMapperExt.CONTENT_TYPE, new PercolatorFieldMapperExt.TypeParser()); + } + + @Override + public void loadExtensions(ExtensionLoader loader) { + ExtensiblePlugin.super.loadExtensions(loader); + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index ace23402b..5df36f5af 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.action.SearchEmailGroupAction import org.opensearch.alerting.action.SearchMonitorAction import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.core.DocLevelMonitorQueries import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction @@ -106,8 +107,8 @@ import org.opensearch.index.IndexModule import org.opensearch.painless.spi.PainlessExtension import org.opensearch.painless.spi.Whitelist import org.opensearch.painless.spi.WhitelistLoader +import org.opensearch.percolator.PercolatorPluginExt import org.opensearch.plugins.ActionPlugin -import org.opensearch.plugins.Plugin import org.opensearch.plugins.ReloadablePlugin import org.opensearch.plugins.ScriptPlugin import org.opensearch.plugins.SearchPlugin @@ -126,7 +127,7 @@ import java.util.function.Supplier * It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY], * [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects. */ -internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() { +internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, PercolatorPluginExt() { override fun getContextWhitelists(): Map, List> { val whitelist = WhitelistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt") @@ -152,6 +153,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var scheduler: JobScheduler lateinit var sweeper: JobSweeper lateinit var scheduledJobIndices: ScheduledJobIndices + lateinit var docLevelMonitorQueries: DocLevelMonitorQueries lateinit var threadPool: ThreadPool lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService @@ -257,11 +259,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerConsumers() .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) + docLevelMonitorQueries = DocLevelMonitorQueries(client.admin(), clusterService) scheduler = JobScheduler(threadPool, runner) sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) this.threadPool = threadPool this.clusterService = clusterService - return listOf(sweeper, scheduler, runner, scheduledJobIndices) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries) } override fun getSettings(): List> { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index 16bb92b5e..9f01d04c3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -4,11 +4,13 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.index.IndexRequest +import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery +import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.Alert @@ -30,11 +32,14 @@ import org.opensearch.client.Client import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.Strings +import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.rest.RestStatus import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder @@ -93,22 +98,46 @@ object DocumentReturningMonitorRunner : MonitorRunner { } } - val queryToDocIds = mutableMapOf>() + val queryToDocIds = mutableMapOf>() val docsToQueries = mutableMapOf>() val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) val idQueryMap = mutableMapOf() - queries.forEach { query -> - val matchingDocIds = runForEachQuery(monitorCtx, docExecutionContext, query, index) - queryToDocIds[query] = matchingDocIds - matchingDocIds.forEach { - docsToQueries.putIfAbsent(it, mutableListOf()) - docsToQueries[it]?.add(query.id) + + val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) + + if (matchingDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) + + matchedQueriesForDocs.forEach { hit -> + val (id, query) = Pair( + hit.id.replace("_${monitor.id}", ""), + ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] + ) + val docLevelQuery = DocLevelQuery(id, id, query.toString()) + + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + if (queryToDocIds.containsKey(docLevelQuery)) { + queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first) + } else { + queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first) + } + + if (docsToQueries.containsKey(matchingDocs[idx].first)) { + docsToQueries[matchingDocs[idx].first]?.add(id) + } else { + docsToQueries[matchingDocs[idx].first] = mutableListOf(id) + } + } } - idQueryMap[query.id] = query } + val queryInputResults = queryToDocIds.mapKeys { it.key.id } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(queryInputResults))) - val queryIds = queries.map { it.id } + val queryIds = queries.map { + idQueryMap[it.id] = it + it.id + } val triggerResults = mutableMapOf() monitor.triggers.forEach { @@ -339,6 +368,56 @@ object DocumentReturningMonitorRunner : MonitorRunner { return allShards.filter { it.primary() }.size } + private fun getMatchingDocs( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + docExecutionCtx: DocumentExecutionContext, + index: String, + dryrun: Boolean + ): List> { + val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int + val matchingDocs = mutableListOf>() + for (i: Int in 0 until count) { + val shard = i.toString() + try { + logger.info("Monitor execution for shard: $shard") + + val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() + logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") + + // If dryrun, set the previous sequence number as 1 less than the max sequence number or 0 + val prevSeqNo = if (dryrun || monitor.id == Monitor.NO_ID) + max(-1, maxSeqNo - 1) + else docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + + if (dryrun) { + logger.info("it is a dryrun") + } + + logger.info("prevSeq: $prevSeqNo, maxSeq: $maxSeqNo") + + val hits: SearchHits = searchShard( + monitorCtx, + index, + shard, + prevSeqNo, + maxSeqNo, + null + ) + logger.info("Search hits for shard_$shard is: ${hits.hits.size}") + + if (hits.hits.isNotEmpty()) { +// logger.info("found matches") + matchingDocs.addAll(getAllDocs(hits, monitor.id)) + } + } catch (e: Exception) { + logger.info("Failed to run for shard $shard. Error: ${e.message}") + logger.debug("Failed to run for shard $shard", e) + } + } + return matchingDocs + } + private fun runForEachQuery( monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, @@ -367,7 +446,6 @@ object DocumentReturningMonitorRunner : MonitorRunner { logger.info("Search hits for shard_$shard is: ${hits.hits.size}") if (hits.hits.isNotEmpty()) { - logger.info("found matches") matchingDocs.addAll(getAllDocIds(hits)) } } catch (e: Exception) { @@ -384,14 +462,17 @@ object DocumentReturningMonitorRunner : MonitorRunner { shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String + query: String? ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() } val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) + + if (query != null) { + boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) + } val request: SearchRequest = SearchRequest() .indices(index) @@ -410,7 +491,47 @@ object DocumentReturningMonitorRunner : MonitorRunner { return response.hits } + private fun getMatchedQueries( + monitorCtx: MonitorRunnerExecutionContext, + docs: List, + monitor: Monitor + ): SearchHits { + val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) + + val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + val searchSourceBuilder = SearchSourceBuilder() + searchSourceBuilder.query(percolateQueryBuilder) + searchRequest.source(searchSourceBuilder) + + return if (monitorCtx.clusterService!!.state().routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { + val response: SearchResponse = monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest).actionGet() + + if (response.status() !== RestStatus.OK) { + throw IOException("Failed to search percolate index: ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + } + response.hits + } else { + SearchHits.empty() + } + } + private fun getAllDocIds(hits: SearchHits): List { return hits.map { hit -> hit.id } } + + private fun getAllDocs(hits: SearchHits, monitorId: String): List> { + return hits.map { hit -> + val sourceMap = hit.sourceAsMap + + var xContentBuilder = XContentFactory.jsonBuilder().startObject() + sourceMap.forEach { (k, v) -> + xContentBuilder = xContentBuilder.field("${k}_$monitorId", v) + } + xContentBuilder = xContentBuilder.endObject() + + val sourceRef = BytesReference.bytes(xContentBuilder) + + Pair(hit.id, sourceRef) + } + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index 606795f22..b52d5817a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -29,6 +29,10 @@ import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.authuser.User +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -132,6 +136,10 @@ class TransportDeleteMonitorAction @Inject constructor( deleteRequest, object : ActionListener { override fun onResponse(response: DeleteResponse) { + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { + deleteDocLevelMonitorQueries() + } actionListener.onResponse(response) } @@ -141,5 +149,20 @@ class TransportDeleteMonitorAction @Inject constructor( } ) } + + private fun deleteDocLevelMonitorQueries() { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) { + } + + override fun onFailure(t: Exception) { + } + } + ) + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index 8813418fa..f54cfe7b9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -11,19 +11,32 @@ import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteMonitorRequest import org.opensearch.alerting.action.ExecuteMonitorResponse +import org.opensearch.alerting.core.DocLevelMonitorQueries +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper @@ -40,12 +53,16 @@ private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) class TransportExecuteMonitorAction @Inject constructor( transportService: TransportService, private val client: Client, + private val clusterService: ClusterService, private val runner: MonitorRunnerService, actionFilters: ActionFilters, - val xContentRegistry: NamedXContentRegistry + val xContentRegistry: NamedXContentRegistry, + private val docLevelMonitorQueries: DocLevelMonitorQueries, + private val settings: Settings ) : HandledTransportAction ( ExecuteMonitorAction.NAME, transportService, actionFilters, ::ExecuteMonitorRequest ) { + @Volatile private var indexTimeout = AlertingSettings.INDEX_TIMEOUT.get(settings) override fun doExecute(task: Task, execMonitorRequest: ExecuteMonitorRequest, actionListener: ActionListener) { @@ -74,6 +91,65 @@ class TransportExecuteMonitorAction @Inject constructor( } } } + val indexDocLevelMonitorQueries = fun(monitor: Monitor, monitorId: String) { + val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries + + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(index)) { + val indexMetadata = clusterState.metadata.index(index) + + if (indexMetadata.mapping() != null) { + val properties = ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) + val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + client.admin().indices().putMapping( + updateMappingRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") + + val request = BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).timeout(indexTimeout) + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace(prop.key, "${prop.key}_$monitorId") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_$monitorId") + .source(mapOf("query" to mapOf("query_string" to mapOf("query" to query)), "monitor_id" to monitorId)) + request.add(indexRequest) + } + + client.bulk( + request, + object : ActionListener { + override fun onResponse(response: BulkResponse) { + log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + executeMonitor(monitor) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + ) + } + } + } if (execMonitorRequest.monitorId != null) { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId) @@ -110,7 +186,23 @@ class TransportExecuteMonitorAction @Inject constructor( true -> execMonitorRequest.monitor as Monitor false -> (execMonitorRequest.monitor as Monitor).copy(user = user) } - executeMonitor(monitor) + + if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { + docLevelMonitorQueries.initDocLevelQueryIndex(object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") + indexDocLevelMonitorQueries(monitor, monitor.id) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } + } else { + executeMonitor(monitor) + } } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 3575d5792..cdf46c132 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -10,6 +10,9 @@ import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest @@ -18,13 +21,16 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.DocumentReturningMonitorRunner import org.opensearch.alerting.action.IndexMonitorAction import org.opensearch.alerting.action.IndexMonitorRequest import org.opensearch.alerting.action.IndexMonitorResponse +import org.opensearch.alerting.core.DocLevelMonitorQueries import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.alerting.core.model.SearchInput @@ -52,6 +58,9 @@ import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.authuser.User import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder @@ -67,6 +76,7 @@ class TransportIndexMonitorAction @Inject constructor( val client: Client, actionFilters: ActionFilters, val scheduledJobIndices: ScheduledJobIndices, + val docLevelMonitorQueries: DocLevelMonitorQueries, val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry @@ -395,6 +405,11 @@ class TransportIndexMonitorAction @Inject constructor( ) return } + + if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + indexDocLevelMonitorQueries(request.monitor, response.id, request.refreshPolicy) + } + actionListener.onResponse( IndexMonitorResponse( response.id, response.version, response.seqNo, @@ -409,6 +424,81 @@ class TransportIndexMonitorAction @Inject constructor( ) } + @Suppress("UNCHECKED_CAST") + private fun indexDocLevelMonitorQueries(monitor: Monitor, monitorId: String, refreshPolicy: RefreshPolicy) { + val indexDocLevelQueries = fun(monitor: Monitor, monitorId: String) { + val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries + + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(index)) { + val indexMetadata = clusterState.metadata.index(index) + + if (indexMetadata.mapping() != null) { + val properties = ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) + val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + client.admin().indices().putMapping( + updateMappingRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") + + val request = BulkRequest().setRefreshPolicy(refreshPolicy).timeout(indexTimeout) + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace(prop.key, "${prop.key}_$monitorId") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_$monitorId") + .source(mapOf("query" to mapOf("query_string" to mapOf("query" to query)), "monitor_id" to monitorId)) + request.add(indexRequest) + } + + client.bulk( + request, + object : ActionListener { + override fun onResponse(response: BulkResponse) { + log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + ) + } + } + } + + if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { + docLevelMonitorQueries.initDocLevelQueryIndex(object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") + indexDocLevelQueries(monitor, monitorId) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } + } + private fun updateMonitor() { val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, request.monitorId) client.get( @@ -476,6 +566,22 @@ class TransportIndexMonitorAction @Inject constructor( ) return } + + if (currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) { + indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, request.refreshPolicy) + } + + override fun onFailure(t: Exception) { + } + } + ) + } actionListener.onResponse( IndexMonitorResponse( response.id, response.version, response.seqNo, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index baa2a8946..fcc70ffba 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -119,6 +119,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + Thread.sleep(2000) indexDoc(testIndex, "1", testDoc) indexDoc(testIndex, "5", testDoc) diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt b/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt new file mode 100644 index 000000000..13067bbbf --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt @@ -0,0 +1,35 @@ +package org.opensearch.alerting.core + +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.alerting.core.model.ScheduledJob +import org.opensearch.client.AdminClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings + +class DocLevelMonitorQueries(private val client: AdminClient, private val clusterService: ClusterService) { + companion object { + @JvmStatic + fun docLevelQueriesMappings(): String { + return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() + } + } + + fun initDocLevelQueryIndex(actionListener: ActionListener) { + if (!docLevelQueryIndexExists()) { + var indexRequest = CreateIndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .mapping(docLevelQueriesMappings()) + .settings( + Settings.builder().put("index.hidden", true) + .build() + ) + client.indices().create(indexRequest, actionListener) + } + } + + fun docLevelQueryIndexExists(): Boolean { + val clusterState = clusterService.state() + return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt index 6b132ced6..95e48d7e5 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt @@ -36,6 +36,7 @@ interface ScheduledJob : Writeable, ToXContentObject { companion object { /** The name of the ElasticSearch index in which we store jobs */ const val SCHEDULED_JOBS_INDEX = ".opendistro-alerting-config" + const val DOC_LEVEL_QUERIES_INDEX = ".opendistro-alerting-queries" const val NO_ID = "" diff --git a/core/src/main/resources/mappings/doc-level-queries.json b/core/src/main/resources/mappings/doc-level-queries.json new file mode 100644 index 000000000..3e6145a16 --- /dev/null +++ b/core/src/main/resources/mappings/doc-level-queries.json @@ -0,0 +1,13 @@ +{ + "_meta": { + "schema_version": 5 + }, + "properties": { + "query": { + "type": "percolator_ext" + }, + "monitor_id": { + "type": "text" + } + } +} \ No newline at end of file