Skip to content

Commit

Permalink
Max File Size (#3422)
Browse files Browse the repository at this point in the history
* k

* k

* k
  • Loading branch information
yuhongsun96 authored Dec 11, 2024
1 parent 4e4214b commit ca988f5
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 37 deletions.
15 changes: 11 additions & 4 deletions backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,21 +417,28 @@
# We don't want the metadata to overwhelm the actual contents of the chunk
SKIP_METADATA_IN_CHUNK = os.environ.get("SKIP_METADATA_IN_CHUNK", "").lower() == "true"
# Timeout to wait for job's last update before killing it, in hours
CLEANUP_INDEXING_JOBS_TIMEOUT = int(os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT", 3))
CLEANUP_INDEXING_JOBS_TIMEOUT = int(
os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT") or 3
)

# The indexer will warn in the logs whenver a document exceeds this threshold (in bytes)
INDEXING_SIZE_WARNING_THRESHOLD = int(
os.environ.get("INDEXING_SIZE_WARNING_THRESHOLD", 100 * 1024 * 1024)
os.environ.get("INDEXING_SIZE_WARNING_THRESHOLD") or 100 * 1024 * 1024
)

# during indexing, will log verbose memory diff stats every x batches and at the end.
# 0 disables this behavior and is the default.
INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL", 0))
INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL") or 0)

# During an indexing attempt, specifies the number of batches which are allowed to
# exception without aborting the attempt.
INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT", 0))
INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT") or 0)

# Maximum file size in a document to be indexed
MAX_DOCUMENT_CHARS = int(os.environ.get("MAX_DOCUMENT_CHARS") or 5_000_000)
MAX_FILE_SIZE_BYTES = int(
os.environ.get("MAX_FILE_SIZE_BYTES") or 2 * 1024 * 1024 * 1024
) # 2GB in bytes

#####
# Miscellaneous
Expand Down
15 changes: 14 additions & 1 deletion backend/danswer/connectors/google_drive/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any
from typing import cast

from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore

from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.app_configs import MAX_FILE_SIZE_BYTES
from danswer.configs.constants import DocumentSource
from danswer.connectors.google_drive.doc_conversion import build_slim_document
from danswer.connectors.google_drive.doc_conversion import (
Expand Down Expand Up @@ -452,12 +454,14 @@ def _fetch_drive_items(
if isinstance(self.creds, ServiceAccountCredentials)
else self._manage_oauth_retrieval
)
return retrieval_method(
drive_files = retrieval_method(
is_slim=is_slim,
start=start,
end=end,
)

return drive_files

def _extract_docs_from_google_drive(
self,
start: SecondsSinceUnixEpoch | None = None,
Expand All @@ -473,6 +477,15 @@ def _extract_docs_from_google_drive(
files_to_process = []
# Gather the files into batches to be processed in parallel
for file in self._fetch_drive_items(is_slim=False, start=start, end=end):
if (
file.get("size")
and int(cast(str, file.get("size"))) > MAX_FILE_SIZE_BYTES
):
logger.warning(
f"Skipping file {file.get('name', 'Unknown')} as it is too large: {file.get('size')} bytes"
)
continue

files_to_process.append(file)
if len(files_to_process) >= LARGE_BATCH_SIZE:
yield from _process_files_batch(
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/connectors/google_drive/file_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

FILE_FIELDS = (
"nextPageToken, files(mimeType, id, name, permissions, modifiedTime, webViewLink, "
"shortcutDetails, owners(emailAddress))"
"shortcutDetails, owners(emailAddress), size)"
)
SLIM_FILE_FIELDS = (
"nextPageToken, files(mimeType, id, name, permissions(emailAddress, type), "
Expand Down
85 changes: 55 additions & 30 deletions backend/danswer/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import traceback
from collections.abc import Callable
from functools import partial
from http import HTTPStatus
from typing import Protocol
Expand All @@ -12,6 +13,7 @@
from danswer.access.models import DocumentAccess
from danswer.configs.app_configs import ENABLE_MULTIPASS_INDEXING
from danswer.configs.app_configs import INDEXING_EXCEPTION_LIMIT
from danswer.configs.app_configs import MAX_DOCUMENT_CHARS
from danswer.configs.constants import DEFAULT_BOOST
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
get_experts_stores_representations,
Expand Down Expand Up @@ -202,40 +204,13 @@ def index_doc_batch_with_handler(


def index_doc_batch_prepare(
document_batch: list[Document],
documents: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
db_session: Session,
ignore_time_skip: bool = False,
) -> DocumentBatchPrepareContext | None:
"""Sets up the documents in the relational DB (source of truth) for permissions, metadata, etc.
This preceeds indexing it into the actual document index."""
documents: list[Document] = []
for document in document_batch:
empty_contents = not any(section.text.strip() for section in document.sections)
if (
(not document.title or not document.title.strip())
and not document.semantic_identifier.strip()
and empty_contents
):
# Skip documents that have neither title nor content
# If the document doesn't have either, then there is no useful information in it
# This is again verified later in the pipeline after chunking but at that point there should
# already be no documents that are empty.
logger.warning(
f"Skipping document with ID {document.id} as it has neither title nor content."
)
continue

if document.title is not None and not document.title.strip() and empty_contents:
# The title is explicitly empty ("" and not None) and the document is empty
# so when building the chunk text representation, it will be empty and unuseable
logger.warning(
f"Skipping document with ID {document.id} as the chunks will be empty."
)
continue

documents.append(document)

# Create a trimmed list of docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
document_ids: list[str] = [document.id for document in documents]
Expand Down Expand Up @@ -282,17 +257,64 @@ def index_doc_batch_prepare(
)


def filter_documents(document_batch: list[Document]) -> list[Document]:
documents: list[Document] = []
for document in document_batch:
empty_contents = not any(section.text.strip() for section in document.sections)
if (
(not document.title or not document.title.strip())
and not document.semantic_identifier.strip()
and empty_contents
):
# Skip documents that have neither title nor content
# If the document doesn't have either, then there is no useful information in it
# This is again verified later in the pipeline after chunking but at that point there should
# already be no documents that are empty.
logger.warning(
f"Skipping document with ID {document.id} as it has neither title nor content."
)
continue

if document.title is not None and not document.title.strip() and empty_contents:
# The title is explicitly empty ("" and not None) and the document is empty
# so when building the chunk text representation, it will be empty and unuseable
logger.warning(
f"Skipping document with ID {document.id} as the chunks will be empty."
)
continue

section_chars = sum(len(section.text) for section in document.sections)
if (
MAX_DOCUMENT_CHARS
and len(document.title or document.semantic_identifier) + section_chars
> MAX_DOCUMENT_CHARS
):
# Skip documents that are too long, later on there are more memory intensive steps done on the text
# and the container will run out of memory and crash. Several other checks are included upstream but
# those are at the connector level so a catchall is still needed.
# Assumption here is that files that are that long, are generated files and not the type users
# generally care for.
logger.warning(
f"Skipping document with ID {document.id} as it is too long."
)
continue

documents.append(document)
return documents


@log_function_time(debug_only=True)
def index_doc_batch(
*,
document_batch: list[Document],
chunker: Chunker,
embedder: IndexingEmbedder,
document_index: DocumentIndex,
document_batch: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
db_session: Session,
ignore_time_skip: bool = False,
tenant_id: str | None = None,
filter_fnc: Callable[[list[Document]], list[Document]] = filter_documents,
) -> tuple[int, int]:
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
Note that the documents should already be batched at this point so that it does not inflate the
Expand All @@ -309,8 +331,11 @@ def index_doc_batch(
is_public=False,
)

logger.debug("Filtering Documents")
filtered_documents = filter_fnc(document_batch)

ctx = index_doc_batch_prepare(
document_batch=document_batch,
documents=filtered_documents,
index_attempt_metadata=index_attempt_metadata,
ignore_time_skip=ignore_time_skip,
db_session=db_session,
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/seeding/load_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def seed_initial_documents(
docs, chunks = _create_indexable_chunks(processed_docs, tenant_id)

index_doc_batch_prepare(
document_batch=docs,
documents=docs,
index_attempt_metadata=IndexAttemptMetadata(
connector_id=connector_id,
credential_id=PUBLIC_CREDENTIAL_ID,
Expand Down
120 changes: 120 additions & 0 deletions backend/tests/unit/danswer/indexing/test_indexing_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from typing import List

from danswer.configs.app_configs import MAX_DOCUMENT_CHARS
from danswer.connectors.models import Document
from danswer.connectors.models import DocumentSource
from danswer.connectors.models import Section
from danswer.indexing.indexing_pipeline import filter_documents


def create_test_document(
doc_id: str = "test_id",
title: str | None = "Test Title",
semantic_id: str = "test_semantic_id",
sections: List[Section] | None = None,
) -> Document:
if sections is None:
sections = [Section(text="Test content", link="test_link")]
return Document(
id=doc_id,
title=title,
semantic_identifier=semantic_id,
sections=sections,
source=DocumentSource.FILE,
metadata={},
)


def test_filter_documents_empty_title_and_content() -> None:
doc = create_test_document(
title="", semantic_id="", sections=[Section(text="", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 0


def test_filter_documents_empty_title_with_content() -> None:
doc = create_test_document(
title="", sections=[Section(text="Valid content", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 1
assert result[0].id == "test_id"


def test_filter_documents_empty_content_with_title() -> None:
doc = create_test_document(
title="Valid Title", sections=[Section(text="", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 1
assert result[0].id == "test_id"


def test_filter_documents_exceeding_max_chars() -> None:
if not MAX_DOCUMENT_CHARS: # Skip if no max chars configured
return
long_text = "a" * (MAX_DOCUMENT_CHARS + 1)
doc = create_test_document(sections=[Section(text=long_text, link="test_link")])
result = filter_documents([doc])
assert len(result) == 0


def test_filter_documents_valid_document() -> None:
doc = create_test_document(
title="Valid Title", sections=[Section(text="Valid content", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 1
assert result[0].id == "test_id"
assert result[0].title == "Valid Title"


def test_filter_documents_whitespace_only() -> None:
doc = create_test_document(
title=" ", semantic_id=" ", sections=[Section(text=" ", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 0


def test_filter_documents_semantic_id_no_title() -> None:
doc = create_test_document(
title=None,
semantic_id="Valid Semantic ID",
sections=[Section(text="Valid content", link="test_link")],
)
result = filter_documents([doc])
assert len(result) == 1
assert result[0].semantic_identifier == "Valid Semantic ID"


def test_filter_documents_multiple_sections() -> None:
doc = create_test_document(
sections=[
Section(text="Content 1", link="test_link"),
Section(text="Content 2", link="test_link"),
Section(text="Content 3", link="test_link"),
]
)
result = filter_documents([doc])
assert len(result) == 1
assert len(result[0].sections) == 3


def test_filter_documents_multiple_documents() -> None:
docs = [
create_test_document(doc_id="1", title="Title 1"),
create_test_document(
doc_id="2", title="", sections=[Section(text="", link="test_link")]
), # Should be filtered
create_test_document(doc_id="3", title="Title 3"),
]
result = filter_documents(docs)
assert len(result) == 2
assert {doc.id for doc in result} == {"1", "3"}


def test_filter_documents_empty_batch() -> None:
result = filter_documents([])
assert len(result) == 0
2 changes: 2 additions & 0 deletions deployment/docker_compose/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ services:
- GONG_CONNECTOR_START_TIME=${GONG_CONNECTOR_START_TIME:-}
- NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP=${NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP:-}
- GITHUB_CONNECTOR_BASE_URL=${GITHUB_CONNECTOR_BASE_URL:-}
- MAX_DOCUMENT_CHARS=${MAX_DOCUMENT_CHARS:-}
- MAX_FILE_SIZE_BYTES=${MAX_FILE_SIZE_BYTES:-}
# Celery Configs (defaults are set in the supervisord.conf file.
# prefer doing that to have one source of defaults)
- CELERY_WORKER_INDEXING_CONCURRENCY=${CELERY_WORKER_INDEXING_CONCURRENCY:-}
Expand Down
2 changes: 2 additions & 0 deletions deployment/kubernetes/env-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ data:
WEB_CONNECTOR_VALIDATE_URLS: ""
GONG_CONNECTOR_START_TIME: ""
NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP: ""
MAX_DOCUMENT_CHARS: ""
MAX_FILE_SIZE_BYTES: ""
# DanswerBot SlackBot Configs
DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER: ""
DANSWER_BOT_DISPLAY_ERROR_MSGS: ""
Expand Down

0 comments on commit ca988f5

Please sign in to comment.