-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batch update of embeddings in document stores #733
Conversation
d075f04
to
3d811d2
Compare
index: Optional[str] = None, | ||
filters: Optional[Dict[str, List[str]]] = None, | ||
return_embedding: Optional[bool] = None, | ||
page_number: Optional[int] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find the term page
here quite confusing because it is essentially a document. I see that these two variables are used to take a slice of the full set of documents. Some terminology more along the lines of documents
and slices
would be clearer in my opinion.
@@ -387,10 +392,12 @@ def get_label_count(self, index: Optional[str] = None) -> int: | |||
return self.get_document_count(index=index) | |||
|
|||
def get_all_documents( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new params, this fn
can be configured to return less than all documents making the name of this fn quite confusing. Probably not somethign to deal with in this PR (not least since it would be a breaking change) but definitely worth addressing as an issue.
haystack/document_store/sql.py
Outdated
@@ -166,6 +173,8 @@ def get_all_documents( | |||
DocumentORM.text, | |||
DocumentORM.vector_id | |||
).filter_by(index=index) | |||
if page_number is not None and page_size is not None: | |||
documents_query = documents_query.offset(page_number * page_size).limit(page_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pagination via offset
can cause performance issue as DB have to scan through all the previous rows.
Window query (many major DB have support for Postgres, Oracle, MySQL etc) or Non-Window query using LIMIT (for SQLite) will give good performance advantage. For more information: https://github.com/sqlalchemy/sqlalchemy/wiki/RangeQuery-and-WindowedRangeQuery
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be easy to implement if get_all_documents()
returns an iterator that yields all documents with a windowed query. As per the current implementation in this PR, each paginated query is an independent call to get_all_documents()
. Having said that, it might be a good idea to implement an optional mode where get_all_documents()
returns an iterator with paginated results. This would abstract away the details of pagination and allow us to use different pagination for each document store while maintaining a uniform interface with get_all_documents()
.
filters: Optional[Dict[str, List[str]]] = None, | ||
return_embedding: Optional[bool] = None, | ||
page_number: Optional[int] = None, | ||
page_size: Optional[int] = None, | ||
) -> List[Document]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can take NextToken class
which is nothing but have page_number
and page_size
.
And get_documents
function can return Tuple[List[Document], NextToken]
. This NextToken
can be used as iterator. WDYT?
haystack/document_store/faiss.py
Outdated
vector_id = self.faiss_index.ntotal | ||
page_number = 0 | ||
for _ in tqdm(range(0, document_count, self.index_buffer_size)): | ||
documents = self.get_all_documents(index=index, page_number=page_number, page_size=self.index_buffer_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add page_number
and page_size
also as parameters of update_embeddings
function then we don't need index_buffer_size
. This will also make update_embeddings
paginated write function.
if len(documents) == 0: | ||
logger.warning("Calling DocumentStore.update_embeddings() on an empty index") | ||
return | ||
document_count = self.get_document_count(index=index) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not related to this PR how we will handle simultaneous/concurrent call to update_embeddings
and write_documents
functions. That will affect value of self.get_document_count(index=index)
function and also create problem in vector_id
duplication. Mainly do we need Global Lock as this looks like critical section.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we'd need some guards for concurrent use of the FAISSDocumentStore
. In the next weeks, we also plan to explore if there's a community interest in a document store like Milvus that already would take care of many scenarios pertaining to production deployments.
7fa6cef
to
0597085
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good!
This PR introduces a
get_all_documents_generator()
method for the document stores. It yields documents iteratively, making it memory efficient when working with a large number of documents.