diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py new file mode 100644 index 000000000000..a10eaba7c954 --- /dev/null +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/OpenSearchVectorUtils.py @@ -0,0 +1,142 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope, PropertyDependency +from EmbeddingUtils import OPENAI, HUGGING_FACE, EMBEDDING_MODEL + +# Space types +L2 = ("L2 (Euclidean distance)", "l2") +L1 = ("L1 (Manhattan distance)", "l1") +LINF = ("L-infinity (chessboard) distance", "linf") +COSINESIMIL = ("Cosine similarity", "cosinesimil") + +HUGGING_FACE_API_KEY = PropertyDescriptor( + name="HuggingFace API Key", + description="The API Key for interacting with HuggingFace", + required=True, + sensitive=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)] +) +HUGGING_FACE_MODEL = PropertyDescriptor( + name="HuggingFace Model", + description="The name of the HuggingFace model to use", + default_value="sentence-transformers/all-MiniLM-L6-v2", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + dependencies=[PropertyDependency(EMBEDDING_MODEL, HUGGING_FACE)] +) +OPENAI_API_KEY = PropertyDescriptor( + name="OpenAI API Key", + description="The API Key for OpenAI in order to create embeddings", + required=True, + sensitive=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)] +) +OPENAI_API_MODEL = PropertyDescriptor( + name="OpenAI Model", + description="The API Key for OpenAI in order to create embeddings", + default_value="text-embedding-ada-002", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + dependencies=[PropertyDependency(EMBEDDING_MODEL, OPENAI)] +) +HTTP_HOST = PropertyDescriptor( + name="HTTP Host", + description="URL where OpenSearch is hosted.", + default_value="http://localhost:9200", + required=True, + validators=[StandardValidators.URL_VALIDATOR] +) +USERNAME = PropertyDescriptor( + name="Username", + description="The username to use for authenticating to OpenSearch server", + required=False, + validators=[StandardValidators.NON_EMPTY_VALIDATOR] +) +PASSWORD = PropertyDescriptor( + name="Password", + description="The password to use for authenticating to OpenSearch server", + required=False, + sensitive=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR] +) +INDEX_NAME = PropertyDescriptor( + name="Index Name", + description="The name of the OpenSearch index.", + sensitive=False, + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES +) +VECTOR_FIELD = PropertyDescriptor( + name="Vector Field Name", + description="The name of field in the document where the embeddings are stored. This field need to be a 'knn_vector' typed field.", + default_value="vector_field", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES +) +TEXT_FIELD = PropertyDescriptor( + name="Text Field Name", + description="The name of field in the document where the text is stored.", + default_value="text", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES +) + + +def create_authentication_params(context): + username = context.getProperty(USERNAME).getValue() + password = context.getProperty(PASSWORD).getValue() + + params = {"verify_certs": "true"} + + if username is not None and password is not None: + params["http_auth"] = (username, password) + + return params + + +def parse_documents(json_lines, id_field_name, file_name): + import json + + texts = [] + metadatas = [] + ids = [] + for i, line in enumerate(json_lines.split("\n"), start=1): + try: + doc = json.loads(line) + except Exception as e: + raise ValueError(f"Could not parse line {i} as JSON") from e + + text = doc.get('text') + metadata = doc.get('metadata') + texts.append(text) + + # Remove any null values, or it will cause the embedding to fail + filtered_metadata = {key: value for key, value in metadata.items() if value is not None} + metadatas.append(filtered_metadata) + + doc_id = None + if id_field_name is not None: + doc_id = metadata.get(id_field_name) + if doc_id is None: + doc_id = file_name + "-" + str(i) + ids.append(doc_id) + + return {"texts": texts, "metadatas": metadatas, "ids": ids} diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py new file mode 100644 index 000000000000..c0ff29bdb7bd --- /dev/null +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/PutOpenSearchVector.py @@ -0,0 +1,245 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from langchain.vectorstores import OpenSearchVectorSearch +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope, PropertyDependency +from OpenSearchVectorUtils import (L2, L1, LINF, COSINESIMIL, OPENAI_API_KEY, OPENAI_API_MODEL, HUGGING_FACE_API_KEY, + HUGGING_FACE_MODEL, HTTP_HOST, USERNAME, PASSWORD, INDEX_NAME, VECTOR_FIELD, + TEXT_FIELD, create_authentication_params, parse_documents) +from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service +from nifiapi.documentation import use_case, ProcessorConfiguration + + +@use_case(description="Create vectors/embeddings that represent text content and send the vectors to OpenSearch", + notes="This use case assumes that the data has already been formatted in JSONL format with the text to store in OpenSearch provided in the 'text' field.", + keywords=["opensearch", "embedding", "vector", "text", "vectorstore", "insert"], + configuration=""" + Configure the 'HTTP Host' to an appropriate URL where OpenSearch is accessible. + Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model' + Configure the 'OpenAI API Key' or 'HuggingFace API Key', depending on the chosen Embedding Model. + Set 'Index Name' to the name of your OpenSearch Index. + Set 'Vector Field Name' to the name of the field in the document which will store the vector data. + Set 'Text Field Name' to the name of the field in the document which will store the text data. + + If the documents to send to OpenSearch contain a unique identifier, set the 'Document ID Field Name' property to the name of the field that contains the document ID. + This property can be left blank, in which case a unique ID will be generated based on the FlowFile's filename. + + If the provided index does not exists in OpenSearch then the processor is capable to create it. The 'New Index Strategy' property defines + that the index needs to be created from the default template or it should be configured with custom values. + """) +@use_case(description="Update vectors/embeddings in OpenSearch", + notes="This use case assumes that the data has already been formatted in JSONL format with the text to store in OpenSearch provided in the 'text' field.", + keywords=["opensearch", "embedding", "vector", "text", "vectorstore", "update", "upsert"], + configuration=""" + Configure the 'HTTP Host' to an appropriate URL where OpenSearch is accessible. + Configure 'Embedding Model' to indicate whether OpenAI embeddings should be used or a HuggingFace embedding model should be used: 'Hugging Face Model' or 'OpenAI Model' + Configure the 'OpenAI API Key' or 'HuggingFace API Key', depending on the chosen Embedding Model. + Set 'Index Name' to the name of your OpenSearch Index. + Set 'Vector Field Name' to the name of the field in the document which will store the vector data. + Set 'Text Field Name' to the name of the field in the document which will store the text data. + Set the 'Document ID Field Name' property to the name of the field that contains the identifier of the document in OpenSearch to update. + """) +class PutOpenSearchVector(FlowFileTransform): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + + class ProcessorDetails: + version = '@project.version@' + description = """Publishes JSON data to OpenSearch. The Incoming data must be in single JSON per Line format, each with two keys: 'text' and 'metadata'. + The text must be a string, while metadata must be a map with strings for values. Any additional fields will be ignored.""" + tags = ["opensearch", "vector", "vectordb", "vectorstore", "embeddings", "ai", "artificial intelligence", "ml", + "machine learning", "text", "LLM"] + + # Engine types + NMSLIB = ("nmslib (Non-Metric Space Library)", "nmslib") + FAISS = ("faiss (Facebook AI Similarity Search)", "faiss") + LUCENE = ("lucene", "lucene") + + ENGINE_VALUES = dict([NMSLIB, FAISS, LUCENE]) + + # Space types + INNERPRODUCT = ("Inner product", "innerproduct") + + NMSLIB_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, INNERPRODUCT]) + FAISS_SPACE_TYPE_VALUES = dict([L2, INNERPRODUCT]) + LUCENE_SPACE_TYPE_VALUES = dict([L2, COSINESIMIL]) + + # New Index Mapping Strategy + DEFAULT_INDEX_MAPPING = "Default index mapping" + CUSTOM_INDEX_MAPPING = "Custom index mapping" + + DOC_ID_FIELD_NAME = PropertyDescriptor( + name="Document ID Field Name", + description="""Specifies the name of the field in the 'metadata' element of each document where the document's ID can be found. + If not specified, an ID will be generated based on the FlowFile's filename and a one-up number.""", + required=False, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES + ) + NEW_INDEX_STRATEGY = PropertyDescriptor( + name="New Index Strategy", + description="Specifies the Mapping strategy to use for new index creation. The default template values are the following: " + "{engine: nmslib, space_type: l2, ef_search: 512, ef_construction: 512, m: 16}", + allowable_values=[DEFAULT_INDEX_MAPPING, CUSTOM_INDEX_MAPPING], + default_value=DEFAULT_INDEX_MAPPING, + required=False, + ) + ENGINE = PropertyDescriptor( + name="Engine", + description="The approximate k-NN library to use for indexing and search.", + allowable_values=ENGINE_VALUES.keys(), + default_value=NMSLIB[0], + required=False, + dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING)] + ) + NMSLIB_SPACE_TYPE = PropertyDescriptor( + name="NMSLIB Space Type", + description="The vector space used to calculate the distance between vectors.", + allowable_values=NMSLIB_SPACE_TYPE_VALUES.keys(), + default_value=L2[0], + required=False, + dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING), + PropertyDependency(ENGINE, NMSLIB[0])] + ) + FAISS_SPACE_TYPE = PropertyDescriptor( + name="FAISS Space Type", + description="The vector space used to calculate the distance between vectors.", + allowable_values=FAISS_SPACE_TYPE_VALUES.keys(), + default_value=L2[0], + required=False, + dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING), + PropertyDependency(ENGINE, FAISS[0])] + ) + LUCENE_SPACE_TYPE = PropertyDescriptor( + name="Lucene Space Type", + description="The vector space used to calculate the distance between vectors.", + allowable_values=LUCENE_SPACE_TYPE_VALUES.keys(), + default_value=L2[0], + required=False, + dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING), + PropertyDependency(ENGINE, LUCENE[0])] + ) + EF_SEARCH = PropertyDescriptor( + name="EF Search", + description="The size of the dynamic list used during k-NN searches. Higher values lead to more accurate but slower searches.", + default_value="512", + required=False, + validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR], + dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING)] + ) + EF_CONSTRUCTION = PropertyDescriptor( + name="EF Construction", + description="The size of the dynamic list used during k-NN graph creation. Higher values lead to a more accurate graph but slower indexing speed.", + default_value="512", + required=False, + validators=[StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR], + dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING)] + ) + M = PropertyDescriptor( + name="M", + description="The number of bidirectional links that the plugin creates for each new element. Increasing and " + "decreasing this value can have a large impact on memory consumption. Keep this value between 2 and 100.", + default_value="16", + required=False, + validators=[StandardValidators._standard_validators.createLongValidator(2, 100, True)], + dependencies=[PropertyDependency(NEW_INDEX_STRATEGY, CUSTOM_INDEX_MAPPING)] + ) + + properties = [EMBEDDING_MODEL, + OPENAI_API_KEY, + OPENAI_API_MODEL, + HUGGING_FACE_API_KEY, + HUGGING_FACE_MODEL, + HTTP_HOST, + USERNAME, + PASSWORD, + INDEX_NAME, + DOC_ID_FIELD_NAME, + VECTOR_FIELD, + TEXT_FIELD, + NEW_INDEX_STRATEGY, + ENGINE, + NMSLIB_SPACE_TYPE, + FAISS_SPACE_TYPE, + LUCENE_SPACE_TYPE, + EF_SEARCH, + EF_CONSTRUCTION, + M] + + embeddings = None + + def __init__(self, **kwargs): + pass + + def getPropertyDescriptors(self): + return self.properties + + def onScheduled(self, context): + self.embeddings = create_embedding_service(context) + + def transform(self, context, flowfile): + file_name = flowfile.getAttribute("filename") + http_host = context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue() + index_name = context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue() + id_field_name = context.getProperty(self.DOC_ID_FIELD_NAME).evaluateAttributeExpressions(flowfile).getValue() + vector_field = context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue() + text_field = context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue() + new_index_strategy = context.getProperty(self.NEW_INDEX_STRATEGY).evaluateAttributeExpressions().getValue() + + params = {"vector_field": vector_field, "text_field": text_field} + params.update(create_authentication_params(context)) + + if new_index_strategy == self.CUSTOM_INDEX_MAPPING: + engine = context.getProperty(self.ENGINE).evaluateAttributeExpressions().getValue() + params["engine"] = self.ENGINE_VALUES.get(engine) + + if engine == self.NMSLIB[0]: + space_type = context.getProperty(self.NMSLIB_SPACE_TYPE).evaluateAttributeExpressions().getValue() + params["space_type"] = self.NMSLIB_SPACE_TYPE_VALUES.get(space_type) + if engine == self.FAISS[0]: + space_type = context.getProperty(self.FAISS_SPACE_TYPE).evaluateAttributeExpressions().getValue() + params["space_type"] = self.FAISS_SPACE_TYPE_VALUES.get(space_type) + if engine == self.LUCENE[0]: + space_type = context.getProperty(self.LUCENE_SPACE_TYPE).evaluateAttributeExpressions().getValue() + params["space_type"] = self.LUCENE_SPACE_TYPE_VALUES.get(space_type) + + ef_search = context.getProperty(self.EF_SEARCH).evaluateAttributeExpressions().asInteger() + params["ef_search"] = ef_search + + ef_construction = context.getProperty(self.EF_CONSTRUCTION).evaluateAttributeExpressions().asInteger() + params["ef_construction"] = ef_construction + + m = context.getProperty(self.M).evaluateAttributeExpressions().asInteger() + params["m"] = m + + # Read the FlowFile content as "json-lines". + json_lines = flowfile.getContentsAsBytes().decode() + parsed_documents = parse_documents(json_lines, id_field_name, file_name) + + vectorstore = OpenSearchVectorSearch( + opensearch_url=http_host, + index_name=index_name, + embedding_function=self.embeddings, + **params + ) + vectorstore.add_texts(texts=parsed_documents["texts"], + metadatas=parsed_documents["metadatas"], + ids=parsed_documents["ids"], + **params + ) + + return FlowFileTransformResult(relationship="success") + diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py new file mode 100644 index 000000000000..488c01d197a8 --- /dev/null +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/QueryOpenSearchVector.py @@ -0,0 +1,219 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from langchain.vectorstores import OpenSearchVectorSearch +from nifiapi.flowfiletransform import FlowFileTransform, FlowFileTransformResult +from nifiapi.properties import PropertyDescriptor, StandardValidators, ExpressionLanguageScope, PropertyDependency +from OpenSearchVectorUtils import (L2, L1, LINF, COSINESIMIL, OPENAI_API_KEY, OPENAI_API_MODEL, HUGGING_FACE_API_KEY, HUGGING_FACE_MODEL, HTTP_HOST, + USERNAME, PASSWORD, INDEX_NAME, VECTOR_FIELD, TEXT_FIELD, create_authentication_params) +from QueryUtils import OUTPUT_STRATEGY, RESULTS_FIELD, INCLUDE_METADATAS, INCLUDE_DISTANCES, QueryUtils +import json +from EmbeddingUtils import EMBEDDING_MODEL, create_embedding_service + +class QueryOpenSearchVector(FlowFileTransform): + class Java: + implements = ['org.apache.nifi.python.processor.FlowFileTransform'] + + class ProcessorDetails: + version = '@project.version@' + description = "Queries OpenSearch in order to gather a specified number of documents that are most closely related to the given query." + tags = ["opensearch", "vector", "vectordb", "vectorstore", "embeddings", "ai", "artificial intelligence", "ml", + "machine learning", "text", "LLM"] + + # Search types + APPROXIMATE_SEARCH = ("Approximate Search", "approximate_search") + SCRIPT_SCORING_SEARCH = ("Script Scoring Search", "script_scoring") + PAINLESS_SCRIPTING_SEARCH = ("Painless Scripting Search", "painless_scripting") + + SEARCH_TYPE_VALUES = dict([APPROXIMATE_SEARCH, SCRIPT_SCORING_SEARCH, PAINLESS_SCRIPTING_SEARCH]) + + # Script Scoring Search space types + HAMMINGBIT = ("Hamming distance", "hammingbit") + + SCRIPT_SCORING_SPACE_TYPE_VALUES = dict([L2, L1, LINF, COSINESIMIL, HAMMINGBIT]) + + # Painless Scripting Search space types + L2_SQUARED = ("L2 (Euclidean distance)", "l2Squared") + L1_NORM = ("L1 (Manhattan distance)", "l1Norm") + COSINE_SIMILARITY = ("Cosine similarity", "cosineSimilarity") + + PAINLESS_SCRIPTING_SPACE_TYPE_VALUES = dict([L2_SQUARED, L1_NORM, COSINE_SIMILARITY]) + + QUERY = PropertyDescriptor( + name="Query", + description="The text of the query to send to OpenSearch.", + required=True, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES + ) + NUMBER_OF_RESULTS = PropertyDescriptor( + name="Number of Results", + description="The number of results to return from OpenSearch", + default_value="10", + required=True, + validators=[StandardValidators.POSITIVE_INTEGER_VALIDATOR], + expression_language_scope=ExpressionLanguageScope.FLOWFILE_ATTRIBUTES + ) + SEARCH_TYPE = PropertyDescriptor( + name="Search Type", + description="Specifies the type of the search to be performed.", + allowable_values=SEARCH_TYPE_VALUES.keys(), + default_value=APPROXIMATE_SEARCH[0], + required=True + ) + SCRIPT_SCORING_SPACE_TYPE = PropertyDescriptor( + name="Script Scoring Space Type", + description="Used to measure the distance between two points in order to determine the k-nearest neighbors.", + allowable_values=SCRIPT_SCORING_SPACE_TYPE_VALUES.keys(), + default_value=L2[0], + required=False, + dependencies=[PropertyDependency(SEARCH_TYPE, SCRIPT_SCORING_SEARCH[0])] + ) + PAINLESS_SCRIPTING_SPACE_TYPE = PropertyDescriptor( + name="Painless Scripting Space Type", + description="Used to measure the distance between two points in order to determine the k-nearest neighbors.", + allowable_values=PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.keys(), + default_value=L2_SQUARED[0], + required=False, + dependencies=[PropertyDependency(SEARCH_TYPE, PAINLESS_SCRIPTING_SEARCH[0])] + ) + BOOLEAN_FILTER = PropertyDescriptor( + name="Boolean Filter", + description="A Boolean filter is a post filter consists of a Boolean query that contains a k-NN query and a filter. " + "The value of the field must be a JSON representation of the filter.", + required=False, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])] + ) + EFFICIENT_FILTER = PropertyDescriptor( + name="Efficient Filter", + description="The Lucene Engine or Faiss Engine decides whether to perform an exact k-NN search with " + "pre-filtering or an approximate search with modified post-filtering. The value of the field must " + "be a JSON representation of the filter.", + required=False, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + dependencies=[PropertyDependency(SEARCH_TYPE, APPROXIMATE_SEARCH[0])] + ) + PRE_FILTER = PropertyDescriptor( + name="Pre Filter", + description="Script Score query to pre-filter documents before identifying nearest neighbors. The value of " + "the field must be a JSON representation of the filter.", + default_value="{\"match_all\": {}}", + required=False, + validators=[StandardValidators.NON_EMPTY_VALIDATOR], + dependencies=[PropertyDependency(SEARCH_TYPE, SCRIPT_SCORING_SEARCH[0], PAINLESS_SCRIPTING_SEARCH[0])] + ) + + properties = [EMBEDDING_MODEL, + OPENAI_API_KEY, + OPENAI_API_MODEL, + HUGGING_FACE_API_KEY, + HUGGING_FACE_MODEL, + HTTP_HOST, + USERNAME, + PASSWORD, + INDEX_NAME, + QUERY, + VECTOR_FIELD, + TEXT_FIELD, + NUMBER_OF_RESULTS, + SEARCH_TYPE, + SCRIPT_SCORING_SPACE_TYPE, + PAINLESS_SCRIPTING_SPACE_TYPE, + BOOLEAN_FILTER, + EFFICIENT_FILTER, + PRE_FILTER, + OUTPUT_STRATEGY, + RESULTS_FIELD, + INCLUDE_METADATAS, + INCLUDE_DISTANCES] + + embeddings = None + query_utils = None + + def __init__(self, **kwargs): + pass + + def getPropertyDescriptors(self): + return self.properties + + def onScheduled(self, context): + # initialize embedding service + self.embeddings = create_embedding_service(context) + self.query_utils = QueryUtils(context) + + def transform(self, context, flowfile): + http_host = context.getProperty(HTTP_HOST).evaluateAttributeExpressions(flowfile).getValue() + index_name = context.getProperty(INDEX_NAME).evaluateAttributeExpressions(flowfile).getValue() + query = context.getProperty(self.QUERY).evaluateAttributeExpressions(flowfile).getValue() + num_results = context.getProperty(self.NUMBER_OF_RESULTS).evaluateAttributeExpressions(flowfile).asInteger() + vector_field = context.getProperty(VECTOR_FIELD).evaluateAttributeExpressions(flowfile).getValue() + text_field = context.getProperty(TEXT_FIELD).evaluateAttributeExpressions(flowfile).getValue() + search_type = context.getProperty(self.SEARCH_TYPE).evaluateAttributeExpressions().getValue() + + params = {"vector_field": vector_field, + "text_field": text_field, + "search_type": self.SEARCH_TYPE_VALUES.get(search_type)} + params.update(create_authentication_params(context)) + + if search_type == self.APPROXIMATE_SEARCH[0]: + boolean_filter = context.getProperty(self.BOOLEAN_FILTER).evaluateAttributeExpressions().getValue() + if boolean_filter is not None: + params["boolean_filter"] = json.loads(boolean_filter) + + efficient_filter = context.getProperty(self.EFFICIENT_FILTER).evaluateAttributeExpressions().getValue() + if efficient_filter is not None: + params["efficient_filter"] = json.loads(efficient_filter) + else: + pre_filter = context.getProperty(self.PRE_FILTER).evaluateAttributeExpressions().getValue() + if pre_filter is not None: + params["pre_filter"] = json.loads(pre_filter) + if search_type == self.SCRIPT_SCORING_SEARCH[0]: + space_type = context.getProperty(self.SCRIPT_SCORING_SPACE_TYPE).evaluateAttributeExpressions().getValue() + params["space_type"] = self.SCRIPT_SCORING_SPACE_TYPE_VALUES.get(space_type) + elif search_type == self.PAINLESS_SCRIPTING_SEARCH[0]: + space_type = context.getProperty(self.PAINLESS_SCRIPTING_SPACE_TYPE).evaluateAttributeExpressions().getValue() + params["space_type"] = self.PAINLESS_SCRIPTING_SPACE_TYPE_VALUES.get(space_type) + + vectorstore = OpenSearchVectorSearch(index_name=index_name, + embedding_function=self.embeddings, + opensearch_url=http_host, + **params + ) + + results = vectorstore.similarity_search_with_score(query=query, k=num_results, **params) + + documents = [] + for result in results: + documents.append(result[0].page_content) + + if context.getProperty(INCLUDE_METADATAS): + metadatas = [] + for result in results: + metadatas.append(result[0].metadata) + else: + metadatas = None + + if context.getProperty(INCLUDE_DISTANCES): + distances = [] + for result in results: + distances.append(result[1]) + else: + distances = None + + (output_contents, mime_type) = self.query_utils.create_json(flowfile, documents, metadatas, None, distances, None) + attributes = {"mime.type": mime_type} + + return FlowFileTransformResult(relationship="success", contents=output_contents, attributes=attributes) diff --git a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt index ad78d29d03f4..fbefc24508e0 100644 --- a/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt +++ b/nifi-python-extensions/nifi-text-embeddings-module/src/main/python/vectorstores/requirements.txt @@ -27,3 +27,6 @@ requests pinecone-client==3.0.1 tiktoken langchain==0.1.11 + +# OpenSearch requirements +opensearch-py==2.5.0