From 33ea26daf302fc69a4878afbd9fc6c1ba3b9d534 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 14 Nov 2024 17:49:14 -0800 Subject: [PATCH 1/3] fix tenant logging --- .../danswer/background/indexing/run_indexing.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 2cd1fc07e2a..8f3ae65fa3d 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -433,11 +433,13 @@ def run_indexing_entrypoint( with get_session_with_tenant(tenant_id) as db_session: attempt = transition_attempt_to_in_progress(index_attempt_id, db_session) + tenant_str = "" + if tenant_id is not None: + tenant_str = f" for tenant {tenant_id}" + logger.info( - f"Indexing starting for tenant {tenant_id}: " - if tenant_id is not None - else "" - + f"connector='{attempt.connector_credential_pair.connector.name}' " + f"Indexing starting{tenant_str}: " + f"connector='{attempt.connector_credential_pair.connector.name}' " f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' " f"credentials='{attempt.connector_credential_pair.connector_id}'" ) @@ -445,10 +447,8 @@ def run_indexing_entrypoint( _run_indexing(db_session, attempt, tenant_id, callback) logger.info( - f"Indexing finished for tenant {tenant_id}: " - if tenant_id is not None - else "" - + f"connector='{attempt.connector_credential_pair.connector.name}' " + f"Indexing finished{tenant_str}: " + f"connector='{attempt.connector_credential_pair.connector.name}' " f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' " f"credentials='{attempt.connector_credential_pair.connector_id}'" ) From bdc5971ee886a0092a42dd356c4f142582c4a955 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 14 Nov 2024 18:06:15 -0800 Subject: [PATCH 2/3] upsert only new/updated docs, but always upsert document to cc pair relationship --- backend/danswer/db/document.py | 29 ++++------- backend/danswer/indexing/indexing_pipeline.py | 49 ++++++++++++------- 2 files changed, 39 insertions(+), 39 deletions(-) diff --git a/backend/danswer/db/document.py b/backend/danswer/db/document.py index 32efbf511be..75d6c8d0875 100644 --- a/backend/danswer/db/document.py +++ b/backend/danswer/db/document.py @@ -308,7 +308,7 @@ def get_access_info_for_documents( return db_session.execute(stmt).all() # type: ignore -def _upsert_documents( +def upsert_documents( db_session: Session, document_metadata_batch: list[DocumentMetadata], initial_boost: int = DEFAULT_BOOST, @@ -364,24 +364,24 @@ def _upsert_documents( db_session.commit() -def _upsert_document_by_connector_credential_pair( - db_session: Session, document_metadata_batch: list[DocumentMetadata] +def upsert_document_by_connector_credential_pair( + db_session: Session, connector_id: int, credential_id: int, document_ids: list[str] ) -> None: """NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause.""" - if not document_metadata_batch: - logger.info("`document_metadata_batch` is empty. Skipping.") + if not document_ids: + logger.info("`document_ids` is empty. Skipping.") return insert_stmt = insert(DocumentByConnectorCredentialPair).values( [ model_to_dict( DocumentByConnectorCredentialPair( - id=document_metadata.document_id, - connector_id=document_metadata.connector_id, - credential_id=document_metadata.credential_id, + id=doc_id, + connector_id=connector_id, + credential_id=credential_id, ) ) - for document_metadata in document_metadata_batch + for doc_id in document_ids ] ) # for now, there are no columns to update. If more metadata is added, then this @@ -442,17 +442,6 @@ def mark_document_as_synced(document_id: str, db_session: Session) -> None: db_session.commit() -def upsert_documents_complete( - db_session: Session, - document_metadata_batch: list[DocumentMetadata], -) -> None: - _upsert_documents(db_session, document_metadata_batch) - _upsert_document_by_connector_credential_pair(db_session, document_metadata_batch) - logger.info( - f"Upserted {len(document_metadata_batch)} document store entries into DB" - ) - - def delete_document_by_connector_credential_pair__no_commit( db_session: Session, document_id: str, diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index 2324edf4d03..688650b2936 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -20,7 +20,8 @@ from danswer.db.document import prepare_to_modify_documents from danswer.db.document import update_docs_last_modified__no_commit from danswer.db.document import update_docs_updated_at__no_commit -from danswer.db.document import upsert_documents_complete +from danswer.db.document import upsert_document_by_connector_credential_pair +from danswer.db.document import upsert_documents from danswer.db.document_set import fetch_document_sets_for_documents from danswer.db.index_attempt import create_index_attempt_error from danswer.db.models import Document as DBDocument @@ -62,7 +63,7 @@ def _upsert_documents_in_db( db_session: Session, ) -> None: # Metadata here refers to basic document info, not metadata about the actual content - doc_m_batch: list[DocumentMetadata] = [] + document_metadata_list: list[DocumentMetadata] = [] for doc in documents: first_link = next( (section.link for section in doc.sections if section.link), "" @@ -77,12 +78,9 @@ def _upsert_documents_in_db( secondary_owners=get_experts_stores_representations(doc.secondary_owners), from_ingestion_api=doc.from_ingestion_api, ) - doc_m_batch.append(db_doc_metadata) + document_metadata_list.append(db_doc_metadata) - upsert_documents_complete( - db_session=db_session, - document_metadata_batch=doc_m_batch, - ) + upsert_documents(db_session, document_metadata_list) # Insert document content metadata for doc in documents: @@ -109,7 +107,10 @@ def get_doc_ids_to_update( documents: list[Document], db_docs: list[DBDocument] ) -> list[Document]: """Figures out which documents actually need to be updated. If a document is already present - and the `updated_at` hasn't changed, we shouldn't need to do anything with it.""" + and the `updated_at` hasn't changed, we shouldn't need to do anything with it. + + NB: Still need to associate the document in the DB if multiple connectors are + indexing the same doc.""" id_update_time_map = { doc.id: doc.doc_updated_at for doc in db_docs if doc.doc_updated_at } @@ -197,7 +198,7 @@ def index_doc_batch_prepare( ) -> DocumentBatchPrepareContext | None: """This sets up the documents in the relational DB (source of truth) for permissions, metadata, etc. This preceeds indexing it into the actual document index.""" - documents = [] + documents: list[Document] = [] for document in document_batch: empty_contents = not any(section.text.strip() for section in document.sections) if ( @@ -223,32 +224,42 @@ def index_doc_batch_prepare( else: documents.append(document) - document_ids = [document.id for document in documents] + # Create a trimmed list of docs that don't have a newer updated at + # Shortcuts the time-consuming flow on connector index retries + document_ids: list[str] = [document.id for document in documents] db_docs: list[DBDocument] = get_documents_by_ids( db_session=db_session, document_ids=document_ids, ) - # Skip indexing docs that don't have a newer updated at - # Shortcuts the time-consuming flow on connector index retries updatable_docs = ( get_doc_ids_to_update(documents=documents, db_docs=db_docs) if not ignore_time_skip else documents ) - # No docs to update either because the batch is empty or every doc was already indexed - if not updatable_docs: - return None - - # Create records in the source of truth about these documents, - # does not include doc_updated_at which is also used to indicate a successful update + # Create a record in the DB for every updateable document. + # Does not include doc_updated_at which is also used to indicate a successful update _upsert_documents_in_db( - documents=documents, + documents=updatable_docs, index_attempt_metadata=index_attempt_metadata, db_session=db_session, ) + # Upsert the document to cc pair relationship for all documents + upsert_document_by_connector_credential_pair( + db_session, + index_attempt_metadata.connector_id, + index_attempt_metadata.credential_id, + document_ids, + ) + + logger.info(f"Upserted {len(updatable_docs)} documents into the DB") + + # No docs to process because the batch is empty or every doc was already indexed + if not updatable_docs: + return None + id_to_db_doc_map = {doc.id: doc for doc in db_docs} return DocumentBatchPrepareContext( updatable_docs=updatable_docs, id_to_db_doc_map=id_to_db_doc_map From 2a492c3155aa72b0046338e92969cde426247323 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 14 Nov 2024 19:42:49 -0800 Subject: [PATCH 3/3] better logging and rough cut at testing --- backend/danswer/indexing/indexing_pipeline.py | 20 ++++--- .../connector/test_connector_deletion.py | 53 +++++++++++++++++++ 2 files changed, 65 insertions(+), 8 deletions(-) diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index 688650b2936..f23bf91d2f1 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -238,15 +238,21 @@ def index_doc_batch_prepare( else documents ) - # Create a record in the DB for every updateable document. + # for all updatable docs, upsert into the DB # Does not include doc_updated_at which is also used to indicate a successful update - _upsert_documents_in_db( - documents=updatable_docs, - index_attempt_metadata=index_attempt_metadata, - db_session=db_session, + if updatable_docs: + _upsert_documents_in_db( + documents=updatable_docs, + index_attempt_metadata=index_attempt_metadata, + db_session=db_session, + ) + + logger.info( + f"Upserted {len(updatable_docs)} changed docs out of " + f"{len(documents)} total docs into the DB" ) - # Upsert the document to cc pair relationship for all documents + # for all docs, upsert the document to cc pair relationship upsert_document_by_connector_credential_pair( db_session, index_attempt_metadata.connector_id, @@ -254,8 +260,6 @@ def index_doc_batch_prepare( document_ids, ) - logger.info(f"Upserted {len(updatable_docs)} documents into the DB") - # No docs to process because the batch is empty or every doc was already indexed if not updatable_docs: return None diff --git a/backend/tests/integration/tests/connector/test_connector_deletion.py b/backend/tests/integration/tests/connector/test_connector_deletion.py index 663aedfc335..b14a75e0045 100644 --- a/backend/tests/integration/tests/connector/test_connector_deletion.py +++ b/backend/tests/integration/tests/connector/test_connector_deletion.py @@ -48,6 +48,59 @@ def test_connector_creation(reset: None) -> None: assert cc_pair_info.creator_email == admin_user.email +# TODO(rkuo): will enable this once i have credentials on github +# def test_overlapping_connector_creation(reset: None) -> None: +# # Creating an admin user (first user created is automatically an admin) +# admin_user: DATestUser = UserManager.create(name="admin_user") + +# config = { +# "wiki_base": os.environ["CONFLUENCE_TEST_SPACE_URL"], +# "space": os.environ["CONFLUENCE_TEST_SPACE"], +# "is_cloud": True, +# "page_id": "", +# } + +# credential = { +# "confluence_username": os.environ["CONFLUENCE_USER_NAME"], +# "confluence_access_token": os.environ["CONFLUENCE_ACCESS_TOKEN"], +# } + +# # store the time before we create the connector so that we know after +# # when the indexing should have started +# now = datetime.now(timezone.utc) + +# # create connector +# cc_pair_1 = CCPairManager.create_from_scratch( +# source=DocumentSource.CONFLUENCE, +# connector_specific_config=config, +# credential_json=credential, +# user_performing_action=admin_user, +# ) + +# CCPairManager.wait_for_indexing( +# cc_pair_1, now, timeout=60, user_performing_action=admin_user +# ) + +# cc_pair_2 = CCPairManager.create_from_scratch( +# source=DocumentSource.CONFLUENCE, +# connector_specific_config=config, +# credential_json=credential, +# user_performing_action=admin_user, +# ) + +# CCPairManager.wait_for_indexing( +# cc_pair_2, now, timeout=60, user_performing_action=admin_user +# ) + +# info_1 = CCPairManager.get_single(cc_pair_1.id) +# assert info_1 + +# info_2 = CCPairManager.get_single(cc_pair_2.id) +# assert info_2 + +# assert info_1.num_docs_indexed == info_2.num_docs_indexed + + def test_connector_deletion(reset: None, vespa_client: vespa_fixture) -> None: # Creating an admin user (first user created is automatically an admin) admin_user: DATestUser = UserManager.create(name="admin_user")