Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/overlapping connectors #3138

Merged
merged 3 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions backend/danswer/background/indexing/run_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,22 +433,22 @@ def run_indexing_entrypoint(
with get_session_with_tenant(tenant_id) as db_session:
attempt = transition_attempt_to_in_progress(index_attempt_id, db_session)

tenant_str = ""
if tenant_id is not None:
tenant_str = f" for tenant {tenant_id}"

logger.info(
f"Indexing starting for tenant {tenant_id}: "
if tenant_id is not None
else ""
+ f"connector='{attempt.connector_credential_pair.connector.name}' "
f"Indexing starting{tenant_str}: "
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'"
)

_run_indexing(db_session, attempt, tenant_id, callback)

logger.info(
f"Indexing finished for tenant {tenant_id}: "
if tenant_id is not None
else ""
+ f"connector='{attempt.connector_credential_pair.connector.name}' "
f"Indexing finished{tenant_str}: "
f"connector='{attempt.connector_credential_pair.connector.name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'"
)
Expand Down
29 changes: 9 additions & 20 deletions backend/danswer/db/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def get_access_info_for_documents(
return db_session.execute(stmt).all() # type: ignore


def _upsert_documents(
def upsert_documents(
db_session: Session,
document_metadata_batch: list[DocumentMetadata],
initial_boost: int = DEFAULT_BOOST,
Expand Down Expand Up @@ -364,24 +364,24 @@ def _upsert_documents(
db_session.commit()


def _upsert_document_by_connector_credential_pair(
db_session: Session, document_metadata_batch: list[DocumentMetadata]
def upsert_document_by_connector_credential_pair(
db_session: Session, connector_id: int, credential_id: int, document_ids: list[str]
) -> None:
"""NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause."""
if not document_metadata_batch:
logger.info("`document_metadata_batch` is empty. Skipping.")
if not document_ids:
logger.info("`document_ids` is empty. Skipping.")
return

insert_stmt = insert(DocumentByConnectorCredentialPair).values(
[
model_to_dict(
DocumentByConnectorCredentialPair(
id=document_metadata.document_id,
connector_id=document_metadata.connector_id,
credential_id=document_metadata.credential_id,
id=doc_id,
connector_id=connector_id,
credential_id=credential_id,
)
)
for document_metadata in document_metadata_batch
for doc_id in document_ids
]
)
# for now, there are no columns to update. If more metadata is added, then this
Expand Down Expand Up @@ -442,17 +442,6 @@ def mark_document_as_synced(document_id: str, db_session: Session) -> None:
db_session.commit()


def upsert_documents_complete(
db_session: Session,
document_metadata_batch: list[DocumentMetadata],
) -> None:
_upsert_documents(db_session, document_metadata_batch)
_upsert_document_by_connector_credential_pair(db_session, document_metadata_batch)
logger.info(
f"Upserted {len(document_metadata_batch)} document store entries into DB"
)


def delete_document_by_connector_credential_pair__no_commit(
db_session: Session,
document_id: str,
Expand Down
57 changes: 36 additions & 21 deletions backend/danswer/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from danswer.db.document import prepare_to_modify_documents
from danswer.db.document import update_docs_last_modified__no_commit
from danswer.db.document import update_docs_updated_at__no_commit
from danswer.db.document import upsert_documents_complete
from danswer.db.document import upsert_document_by_connector_credential_pair
from danswer.db.document import upsert_documents
from danswer.db.document_set import fetch_document_sets_for_documents
from danswer.db.index_attempt import create_index_attempt_error
from danswer.db.models import Document as DBDocument
Expand Down Expand Up @@ -62,7 +63,7 @@ def _upsert_documents_in_db(
db_session: Session,
) -> None:
# Metadata here refers to basic document info, not metadata about the actual content
doc_m_batch: list[DocumentMetadata] = []
document_metadata_list: list[DocumentMetadata] = []
for doc in documents:
first_link = next(
(section.link for section in doc.sections if section.link), ""
Expand All @@ -77,12 +78,9 @@ def _upsert_documents_in_db(
secondary_owners=get_experts_stores_representations(doc.secondary_owners),
from_ingestion_api=doc.from_ingestion_api,
)
doc_m_batch.append(db_doc_metadata)
document_metadata_list.append(db_doc_metadata)

upsert_documents_complete(
db_session=db_session,
document_metadata_batch=doc_m_batch,
)
upsert_documents(db_session, document_metadata_list)

# Insert document content metadata
for doc in documents:
Expand All @@ -109,7 +107,10 @@ def get_doc_ids_to_update(
documents: list[Document], db_docs: list[DBDocument]
) -> list[Document]:
"""Figures out which documents actually need to be updated. If a document is already present
and the `updated_at` hasn't changed, we shouldn't need to do anything with it."""
and the `updated_at` hasn't changed, we shouldn't need to do anything with it.

NB: Still need to associate the document in the DB if multiple connectors are
indexing the same doc."""
id_update_time_map = {
doc.id: doc.doc_updated_at for doc in db_docs if doc.doc_updated_at
}
Expand Down Expand Up @@ -197,7 +198,7 @@ def index_doc_batch_prepare(
) -> DocumentBatchPrepareContext | None:
"""This sets up the documents in the relational DB (source of truth) for permissions, metadata, etc.
This preceeds indexing it into the actual document index."""
documents = []
documents: list[Document] = []
for document in document_batch:
empty_contents = not any(section.text.strip() for section in document.sections)
if (
Expand All @@ -223,32 +224,46 @@ def index_doc_batch_prepare(
else:
documents.append(document)

document_ids = [document.id for document in documents]
# Create a trimmed list of docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
document_ids: list[str] = [document.id for document in documents]
db_docs: list[DBDocument] = get_documents_by_ids(
db_session=db_session,
document_ids=document_ids,
)

# Skip indexing docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
updatable_docs = (
get_doc_ids_to_update(documents=documents, db_docs=db_docs)
if not ignore_time_skip
else documents
)

# No docs to update either because the batch is empty or every doc was already indexed
if not updatable_docs:
return None
# for all updatable docs, upsert into the DB
# Does not include doc_updated_at which is also used to indicate a successful update
if updatable_docs:
_upsert_documents_in_db(
documents=updatable_docs,
index_attempt_metadata=index_attempt_metadata,
db_session=db_session,
)

# Create records in the source of truth about these documents,
# does not include doc_updated_at which is also used to indicate a successful update
_upsert_documents_in_db(
documents=documents,
index_attempt_metadata=index_attempt_metadata,
db_session=db_session,
logger.info(
f"Upserted {len(updatable_docs)} changed docs out of "
f"{len(documents)} total docs into the DB"
)

# for all docs, upsert the document to cc pair relationship
upsert_document_by_connector_credential_pair(
db_session,
index_attempt_metadata.connector_id,
index_attempt_metadata.credential_id,
document_ids,
)

# No docs to process because the batch is empty or every doc was already indexed
if not updatable_docs:
return None

id_to_db_doc_map = {doc.id: doc for doc in db_docs}
return DocumentBatchPrepareContext(
updatable_docs=updatable_docs, id_to_db_doc_map=id_to_db_doc_map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,59 @@ def test_connector_creation(reset: None) -> None:
assert cc_pair_info.creator_email == admin_user.email


# TODO(rkuo): will enable this once i have credentials on github
# def test_overlapping_connector_creation(reset: None) -> None:
# # Creating an admin user (first user created is automatically an admin)
# admin_user: DATestUser = UserManager.create(name="admin_user")

# config = {
# "wiki_base": os.environ["CONFLUENCE_TEST_SPACE_URL"],
# "space": os.environ["CONFLUENCE_TEST_SPACE"],
# "is_cloud": True,
# "page_id": "",
# }

# credential = {
# "confluence_username": os.environ["CONFLUENCE_USER_NAME"],
# "confluence_access_token": os.environ["CONFLUENCE_ACCESS_TOKEN"],
# }

# # store the time before we create the connector so that we know after
# # when the indexing should have started
# now = datetime.now(timezone.utc)

# # create connector
# cc_pair_1 = CCPairManager.create_from_scratch(
# source=DocumentSource.CONFLUENCE,
# connector_specific_config=config,
# credential_json=credential,
# user_performing_action=admin_user,
# )

# CCPairManager.wait_for_indexing(
# cc_pair_1, now, timeout=60, user_performing_action=admin_user
# )

# cc_pair_2 = CCPairManager.create_from_scratch(
# source=DocumentSource.CONFLUENCE,
# connector_specific_config=config,
# credential_json=credential,
# user_performing_action=admin_user,
# )

# CCPairManager.wait_for_indexing(
# cc_pair_2, now, timeout=60, user_performing_action=admin_user
# )

# info_1 = CCPairManager.get_single(cc_pair_1.id)
# assert info_1

# info_2 = CCPairManager.get_single(cc_pair_2.id)
# assert info_2

# assert info_1.num_docs_indexed == info_2.num_docs_indexed


def test_connector_deletion(reset: None, vespa_client: vespa_fixture) -> None:
# Creating an admin user (first user created is automatically an admin)
admin_user: DATestUser = UserManager.create(name="admin_user")
Expand Down
Loading