Skip to content

Commit

Permalink
lock to avoid rare serializable errors (#2818)
Browse files Browse the repository at this point in the history
Co-authored-by: Richard Kuo <[email protected]>
  • Loading branch information
rkuo-danswer and LostVector authored Oct 17, 2024
1 parent 114326d commit 0de4870
Showing 1 changed file with 51 additions and 14 deletions.
65 changes: 51 additions & 14 deletions backend/danswer/db/index_attempt.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,55 @@ def mark_attempt_in_progress(
index_attempt: IndexAttempt,
db_session: Session,
) -> None:
index_attempt.status = IndexingStatus.IN_PROGRESS
index_attempt.time_started = index_attempt.time_started or func.now() # type: ignore
db_session.commit()
with db_session.begin_nested():
try:
attempt = db_session.execute(
select(IndexAttempt)
.where(IndexAttempt.id == index_attempt.id)
.with_for_update()
).scalar_one()

attempt.status = IndexingStatus.IN_PROGRESS
attempt.time_started = index_attempt.time_started or func.now() # type: ignore
db_session.commit()
except Exception:
db_session.rollback()


def mark_attempt_succeeded(
index_attempt: IndexAttempt,
db_session: Session,
) -> None:
index_attempt.status = IndexingStatus.SUCCESS
db_session.add(index_attempt)
db_session.commit()
with db_session.begin_nested():
try:
attempt = db_session.execute(
select(IndexAttempt)
.where(IndexAttempt.id == index_attempt.id)
.with_for_update()
).scalar_one()

attempt.status = IndexingStatus.SUCCESS
db_session.commit()
except Exception:
db_session.rollback()


def mark_attempt_partially_succeeded(
index_attempt: IndexAttempt,
db_session: Session,
) -> None:
index_attempt.status = IndexingStatus.COMPLETED_WITH_ERRORS
db_session.add(index_attempt)
db_session.commit()
with db_session.begin_nested():
try:
attempt = db_session.execute(
select(IndexAttempt)
.where(IndexAttempt.id == index_attempt.id)
.with_for_update()
).scalar_one()

attempt.status = IndexingStatus.COMPLETED_WITH_ERRORS
db_session.commit()
except Exception:
db_session.rollback()


def mark_attempt_failed(
Expand All @@ -134,11 +162,20 @@ def mark_attempt_failed(
failure_reason: str = "Unknown",
full_exception_trace: str | None = None,
) -> None:
index_attempt.status = IndexingStatus.FAILED
index_attempt.error_msg = failure_reason
index_attempt.full_exception_trace = full_exception_trace
db_session.add(index_attempt)
db_session.commit()
with db_session.begin_nested():
try:
attempt = db_session.execute(
select(IndexAttempt)
.where(IndexAttempt.id == index_attempt.id)
.with_for_update()
).scalar_one()

attempt.status = IndexingStatus.FAILED
attempt.error_msg = failure_reason
attempt.full_exception_trace = full_exception_trace
db_session.commit()
except Exception:
db_session.rollback()

source = index_attempt.connector_credential_pair.connector.source
optional_telemetry(record_type=RecordType.FAILURE, data={"connector": source})
Expand Down

0 comments on commit 0de4870

Please sign in to comment.