Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/monitor exceptions #2830

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions backend/danswer/background/celery/tasks/vespa/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.document import count_documents_by_needs_sync
from danswer.db.document import get_document
from danswer.db.document import get_document_ids_for_connector_credential_pair
from danswer.db.document import mark_document_as_synced
from danswer.db.document_set import delete_document_set
from danswer.db.document_set import delete_document_set_cc_pair_relationship__no_commit
Expand Down Expand Up @@ -363,7 +364,7 @@ def monitor_connector_deletion_taskset(

count = cast(int, r.scard(rcd.taskset_key))
task_logger.info(
f"Connector deletion progress: cc_pair_id={cc_pair_id} remaining={count} initial={initial_count}"
f"Connector deletion progress: cc_pair={cc_pair_id} remaining={count} initial={initial_count}"
)
if count > 0:
return
Expand All @@ -372,16 +373,27 @@ def monitor_connector_deletion_taskset(
cc_pair = get_connector_credential_pair_from_id(cc_pair_id, db_session)
if not cc_pair:
task_logger.warning(
f"monitor_connector_deletion_taskset - cc_pair_id not found: cc_pair_id={cc_pair_id}"
f"Connector deletion - cc_pair not found: cc_pair={cc_pair_id}"
)
return

try:
doc_ids = get_document_ids_for_connector_credential_pair(
db_session, cc_pair.connector_id, cc_pair.credential_id
)
if len(doc_ids) > 0:
# if this happens, documents somehow got added while deletion was in progress. Likely a bug
# gating off pruning and indexing work before deletion starts
task_logger.warning(
f"Connector deletion - documents still found after taskset completion: "
f"cc_pair={cc_pair_id} num={len(doc_ids)}"
)

# clean up the rest of the related Postgres entities
# index attempts
delete_index_attempts(
db_session=db_session,
cc_pair_id=cc_pair.id,
cc_pair_id=cc_pair_id,
)

# document sets
Expand All @@ -398,7 +410,7 @@ def monitor_connector_deletion_taskset(
noop_fallback,
)
cleanup_user_groups(
cc_pair_id=cc_pair.id,
cc_pair_id=cc_pair_id,
db_session=db_session,
)

Expand All @@ -420,20 +432,21 @@ def monitor_connector_deletion_taskset(
db_session.delete(connector)
db_session.commit()
except Exception as e:
db_session.rollback()
stack_trace = traceback.format_exc()
error_message = f"Error: {str(e)}\n\nStack Trace:\n{stack_trace}"
add_deletion_failure_message(db_session, cc_pair.id, error_message)
add_deletion_failure_message(db_session, cc_pair_id, error_message)
task_logger.exception(
f"Failed to run connector_deletion. "
f"cc_pair_id={cc_pair_id} connector_id={cc_pair.connector_id} credential_id={cc_pair.credential_id}"
f"cc_pair={cc_pair_id} connector={cc_pair.connector_id} credential={cc_pair.credential_id}"
)
raise e

task_logger.info(
f"Successfully deleted cc_pair: "
f"cc_pair_id={cc_pair_id} "
f"connector_id={cc_pair.connector_id} "
f"credential_id={cc_pair.credential_id} "
f"cc_pair={cc_pair_id} "
f"connector={cc_pair.connector_id} "
f"credential={cc_pair.credential_id} "
f"docs_deleted={initial_count}"
)

Expand Down
5 changes: 3 additions & 2 deletions backend/danswer/db/document_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def mark_document_set_as_to_be_deleted(

def delete_document_set_cc_pair_relationship__no_commit(
connector_id: int, credential_id: int, db_session: Session
) -> None:
) -> int:
"""Deletes all rows from DocumentSet__ConnectorCredentialPair where the
connector_credential_pair_id matches the given cc_pair_id."""
delete_stmt = delete(DocumentSet__ConnectorCredentialPair).where(
Expand All @@ -409,7 +409,8 @@ def delete_document_set_cc_pair_relationship__no_commit(
== ConnectorCredentialPair.id,
)
)
db_session.execute(delete_stmt)
result = db_session.execute(delete_stmt)
return result.rowcount # type: ignore


def fetch_document_sets(
Expand Down