Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support add_embeddings for elasticsearch #11002

Merged
merged 3 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 109 additions & 58 deletions libs/langchain/langchain/vectorstores/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,33 +866,17 @@ def _create_index_if_not_exists(
)
self.client.indices.create(index=index_name, **indexSettings)

def add_texts(
def __add(
self,
texts: Iterable[str],
embeddings: Optional[List[List[float]]],
metadatas: Optional[List[Dict[Any, Any]]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.

Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
refresh_indices: Whether to refresh the Elasticsearch indices
after adding the texts.
create_index_if_not_exists: Whether to create the Elasticsearch
index if it doesn't already exist.
*bulk_kwargs: Additional arguments to pass to Elasticsearch bulk.
- chunk_size: Optional. Number of texts to add to the
index at a time. Defaults to 500.

Returns:
List of ids from adding the texts into the vectorstore.
"""
try:
from elasticsearch.helpers import BulkIndexError, bulk
except ImportError:
Expand All @@ -901,53 +885,33 @@ def add_texts(
"Please install it with `pip install elasticsearch`."
)
bulk_kwargs = bulk_kwargs or {}
embeddings = []
ids = ids or [str(uuid.uuid4()) for _ in texts]
requests = []

if self.embedding is not None:
# If no search_type requires inference, we use the provided
# embedding function to embed the texts.
embeddings = self.embedding.embed_documents(list(texts))
dims_length = len(embeddings[0])

if create_index_if_not_exists:
self._create_index_if_not_exists(
index_name=self.index_name, dims_length=dims_length
)

for i, (text, vector) in enumerate(zip(texts, embeddings)):
metadata = metadatas[i] if metadatas else {}
if create_index_if_not_exists:
if embeddings:
dims_length = len(embeddings[0])
else:
dims_length = None

requests.append(
{
"_op_type": "index",
"_index": self.index_name,
self.query_field: text,
self.vector_query_field: vector,
"metadata": metadata,
"_id": ids[i],
}
)
self._create_index_if_not_exists(
index_name=self.index_name, dims_length=dims_length
)

else:
# the search_type doesn't require inference, so we don't need to
# embed the texts.
if create_index_if_not_exists:
self._create_index_if_not_exists(index_name=self.index_name)
for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}

for i, text in enumerate(texts):
metadata = metadatas[i] if metadatas else {}
request = {
"_op_type": "index",
"_index": self.index_name,
self.query_field: text,
"metadata": metadata,
"_id": ids[i],
}
if embeddings:
request[self.vector_query_field] = embeddings[i]

requests.append(
{
"_op_type": "index",
"_index": self.index_name,
self.query_field: text,
"metadata": metadata,
"_id": ids[i],
}
)
requests.append(request)

if len(requests) > 0:
try:
Expand All @@ -974,6 +938,93 @@ def add_texts(
logger.debug("No texts to add to index")
return []

def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[Dict[Any, Any]]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.

Args:
texts: Iterable of strings to add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of ids to associate with the texts.
refresh_indices: Whether to refresh the Elasticsearch indices
after adding the texts.
create_index_if_not_exists: Whether to create the Elasticsearch
index if it doesn't already exist.
*bulk_kwargs: Additional arguments to pass to Elasticsearch bulk.
- chunk_size: Optional. Number of texts to add to the
index at a time. Defaults to 500.

Returns:
List of ids from adding the texts into the vectorstore.
"""
if self.embedding is not None:
# If no search_type requires inference, we use the provided
# embedding function to embed the texts.
embeddings = self.embedding.embed_documents(list(texts))
else:
# the search_type doesn't require inference, so we don't need to
# embed the texts.
embeddings = None

return self.__add(
texts,
embeddings,
metadatas=metadatas,
ids=ids,
refresh_indices=refresh_indices,
create_index_if_not_exists=create_index_if_not_exists,
bulk_kwargs=bulk_kwargs,
kwargs=kwargs,
)

def add_embeddings(
self,
text_embeddings: Iterable[Tuple[str, List[float]]],
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
refresh_indices: bool = True,
create_index_if_not_exists: bool = True,
bulk_kwargs: Optional[Dict] = None,
**kwargs: Any,
) -> List[str]:
"""Add the given texts and embeddings to the vectorstore.

Args:
text_embeddings: Iterable pairs of string and embedding to
add to the vectorstore.
metadatas: Optional list of metadatas associated with the texts.
ids: Optional list of unique IDs.
refresh_indices: Whether to refresh the Elasticsearch indices
after adding the texts.
create_index_if_not_exists: Whether to create the Elasticsearch
index if it doesn't already exist.
*bulk_kwargs: Additional arguments to pass to Elasticsearch bulk.
- chunk_size: Optional. Number of texts to add to the
index at a time. Defaults to 500.

Returns:
List of ids from adding the texts into the vectorstore.
"""
texts, embeddings = zip(*text_embeddings)
return self.__add(
list(texts),
list(embeddings),
metadatas=metadatas,
ids=ids,
refresh_indices=refresh_indices,
create_index_if_not_exists=create_index_if_not_exists,
bulk_kwargs=bulk_kwargs,
kwargs=kwargs,
)

@classmethod
def from_texts(
cls,
Expand Down
2 changes: 1 addition & 1 deletion libs/langchain/langchain/vectorstores/faiss.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def add_embeddings(
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> List[str]:
"""Run more texts through the embeddings and add to the vectorstore.
"""Add the given texts and embeddings to the vectorstore.

Args:
text_embeddings: Iterable pairs of string and embedding to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,34 @@ async def test_similarity_search_without_metadat_async(
output = await docsearch.asimilarity_search("foo", k=1)
assert output == [Document(page_content="foo")]

def test_add_embeddings(
self, elasticsearch_connection: dict, index_name: str
) -> None:
"""
Test add_embeddings, which accepts pre-built embeddings instead of
using inference for the texts.
This allows you to separate the embeddings text and the page_content
for better proximity between user's question and embedded text.
For example, your embedding text can be a question, whereas page_content
is the answer.
"""
embeddings = ConsistentFakeEmbeddings()
text_input = ["foo1", "foo2", "foo3"]
metadatas = [{"page": i} for i in range(len(text_input))]

"""In real use case, embedding_input can be questions for each text"""
embedding_input = ["foo2", "foo3", "foo1"]
embedding_vectors = embeddings.embed_documents(embedding_input)

docsearch = ElasticsearchStore._create_cls_from_kwargs(
embeddings,
**elasticsearch_connection,
index_name=index_name,
)
docsearch.add_embeddings(list(zip(text_input, embedding_vectors)), metadatas)
output = docsearch.similarity_search("foo1", k=1)
assert output == [Document(page_content="foo3", metadata={"page": 2})]

def test_similarity_search_with_metadata(
self, elasticsearch_connection: dict, index_name: str
) -> None:
Expand Down