Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max File Size #3422

Merged
merged 3 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,21 +411,28 @@
# We don't want the metadata to overwhelm the actual contents of the chunk
SKIP_METADATA_IN_CHUNK = os.environ.get("SKIP_METADATA_IN_CHUNK", "").lower() == "true"
# Timeout to wait for job's last update before killing it, in hours
CLEANUP_INDEXING_JOBS_TIMEOUT = int(os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT", 3))
CLEANUP_INDEXING_JOBS_TIMEOUT = int(
os.environ.get("CLEANUP_INDEXING_JOBS_TIMEOUT") or 3
)

# The indexer will warn in the logs whenver a document exceeds this threshold (in bytes)
INDEXING_SIZE_WARNING_THRESHOLD = int(
os.environ.get("INDEXING_SIZE_WARNING_THRESHOLD", 100 * 1024 * 1024)
os.environ.get("INDEXING_SIZE_WARNING_THRESHOLD") or 100 * 1024 * 1024
)

# during indexing, will log verbose memory diff stats every x batches and at the end.
# 0 disables this behavior and is the default.
INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL", 0))
INDEXING_TRACER_INTERVAL = int(os.environ.get("INDEXING_TRACER_INTERVAL") or 0)

# During an indexing attempt, specifies the number of batches which are allowed to
# exception without aborting the attempt.
INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT", 0))
INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT") or 0)

# Maximum file size in a document to be indexed
MAX_DOCUMENT_CHARS = int(os.environ.get("MAX_DOCUMENT_CHARS") or 5_000_000)
MAX_FILE_SIZE_BYTES = int(
os.environ.get("MAX_FILE_SIZE_BYTES") or 2 * 1024 * 1024 * 1024
) # 2GB in bytes

#####
# Miscellaneous
Expand Down
15 changes: 14 additions & 1 deletion backend/danswer/connectors/google_drive/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any
from typing import cast

from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore

from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.app_configs import MAX_FILE_SIZE_BYTES
from danswer.configs.constants import DocumentSource
from danswer.connectors.google_drive.doc_conversion import build_slim_document
from danswer.connectors.google_drive.doc_conversion import (
Expand Down Expand Up @@ -452,12 +454,14 @@ def _fetch_drive_items(
if isinstance(self.creds, ServiceAccountCredentials)
else self._manage_oauth_retrieval
)
return retrieval_method(
drive_files = retrieval_method(
is_slim=is_slim,
start=start,
end=end,
)

return drive_files

def _extract_docs_from_google_drive(
self,
start: SecondsSinceUnixEpoch | None = None,
Expand All @@ -473,6 +477,15 @@ def _extract_docs_from_google_drive(
files_to_process = []
# Gather the files into batches to be processed in parallel
for file in self._fetch_drive_items(is_slim=False, start=start, end=end):
if (
file.get("size")
and int(cast(str, file.get("size"))) > MAX_FILE_SIZE_BYTES
):
logger.warning(
f"Skipping file {file.get('name', 'Unknown')} as it is too large: {file.get('size')} bytes"
)
continue

files_to_process.append(file)
if len(files_to_process) >= LARGE_BATCH_SIZE:
yield from _process_files_batch(
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/connectors/google_drive/file_retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

FILE_FIELDS = (
"nextPageToken, files(mimeType, id, name, permissions, modifiedTime, webViewLink, "
"shortcutDetails, owners(emailAddress))"
"shortcutDetails, owners(emailAddress), size)"
)
SLIM_FILE_FIELDS = (
"nextPageToken, files(mimeType, id, name, permissions(emailAddress, type), "
Expand Down
85 changes: 55 additions & 30 deletions backend/danswer/indexing/indexing_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import traceback
from collections.abc import Callable
from functools import partial
from http import HTTPStatus
from typing import Protocol
Expand All @@ -12,6 +13,7 @@
from danswer.access.models import DocumentAccess
from danswer.configs.app_configs import ENABLE_MULTIPASS_INDEXING
from danswer.configs.app_configs import INDEXING_EXCEPTION_LIMIT
from danswer.configs.app_configs import MAX_DOCUMENT_CHARS
from danswer.configs.constants import DEFAULT_BOOST
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
get_experts_stores_representations,
Expand Down Expand Up @@ -202,40 +204,13 @@ def index_doc_batch_with_handler(


def index_doc_batch_prepare(
document_batch: list[Document],
documents: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
db_session: Session,
ignore_time_skip: bool = False,
) -> DocumentBatchPrepareContext | None:
"""Sets up the documents in the relational DB (source of truth) for permissions, metadata, etc.
This preceeds indexing it into the actual document index."""
documents: list[Document] = []
for document in document_batch:
empty_contents = not any(section.text.strip() for section in document.sections)
if (
(not document.title or not document.title.strip())
and not document.semantic_identifier.strip()
and empty_contents
):
# Skip documents that have neither title nor content
# If the document doesn't have either, then there is no useful information in it
# This is again verified later in the pipeline after chunking but at that point there should
# already be no documents that are empty.
logger.warning(
f"Skipping document with ID {document.id} as it has neither title nor content."
)
continue

if document.title is not None and not document.title.strip() and empty_contents:
# The title is explicitly empty ("" and not None) and the document is empty
# so when building the chunk text representation, it will be empty and unuseable
logger.warning(
f"Skipping document with ID {document.id} as the chunks will be empty."
)
continue

documents.append(document)

# Create a trimmed list of docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
document_ids: list[str] = [document.id for document in documents]
Expand Down Expand Up @@ -282,17 +257,64 @@ def index_doc_batch_prepare(
)


def filter_documents(document_batch: list[Document]) -> list[Document]:
documents: list[Document] = []
for document in document_batch:
empty_contents = not any(section.text.strip() for section in document.sections)
if (
(not document.title or not document.title.strip())
and not document.semantic_identifier.strip()
and empty_contents
):
# Skip documents that have neither title nor content
# If the document doesn't have either, then there is no useful information in it
# This is again verified later in the pipeline after chunking but at that point there should
# already be no documents that are empty.
logger.warning(
f"Skipping document with ID {document.id} as it has neither title nor content."
)
continue

if document.title is not None and not document.title.strip() and empty_contents:
# The title is explicitly empty ("" and not None) and the document is empty
# so when building the chunk text representation, it will be empty and unuseable
logger.warning(
f"Skipping document with ID {document.id} as the chunks will be empty."
)
continue

section_chars = sum(len(section.text) for section in document.sections)
if (
MAX_DOCUMENT_CHARS
and len(document.title or document.semantic_identifier) + section_chars
> MAX_DOCUMENT_CHARS
):
# Skip documents that are too long, later on there are more memory intensive steps done on the text
# and the container will run out of memory and crash. Several other checks are included upstream but
# those are at the connector level so a catchall is still needed.
# Assumption here is that files that are that long, are generated files and not the type users
# generally care for.
logger.warning(
f"Skipping document with ID {document.id} as it is too long."
)
continue

documents.append(document)
return documents


@log_function_time(debug_only=True)
def index_doc_batch(
*,
document_batch: list[Document],
chunker: Chunker,
embedder: IndexingEmbedder,
document_index: DocumentIndex,
document_batch: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
db_session: Session,
ignore_time_skip: bool = False,
tenant_id: str | None = None,
filter_fnc: Callable[[list[Document]], list[Document]] = filter_documents,
) -> tuple[int, int]:
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
Note that the documents should already be batched at this point so that it does not inflate the
Expand All @@ -309,8 +331,11 @@ def index_doc_batch(
is_public=False,
)

logger.debug("Filtering Documents")
filtered_documents = filter_fnc(document_batch)

ctx = index_doc_batch_prepare(
document_batch=document_batch,
documents=filtered_documents,
index_attempt_metadata=index_attempt_metadata,
ignore_time_skip=ignore_time_skip,
db_session=db_session,
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/seeding/load_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def seed_initial_documents(
docs, chunks = _create_indexable_chunks(processed_docs, tenant_id)

index_doc_batch_prepare(
document_batch=docs,
documents=docs,
index_attempt_metadata=IndexAttemptMetadata(
connector_id=connector_id,
credential_id=PUBLIC_CREDENTIAL_ID,
Expand Down
120 changes: 120 additions & 0 deletions backend/tests/unit/danswer/indexing/test_indexing_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from typing import List

from danswer.configs.app_configs import MAX_DOCUMENT_CHARS
from danswer.connectors.models import Document
from danswer.connectors.models import DocumentSource
from danswer.connectors.models import Section
from danswer.indexing.indexing_pipeline import filter_documents


def create_test_document(
doc_id: str = "test_id",
title: str | None = "Test Title",
semantic_id: str = "test_semantic_id",
sections: List[Section] | None = None,
) -> Document:
if sections is None:
sections = [Section(text="Test content", link="test_link")]
return Document(
id=doc_id,
title=title,
semantic_identifier=semantic_id,
sections=sections,
source=DocumentSource.FILE,
metadata={},
)


def test_filter_documents_empty_title_and_content() -> None:
doc = create_test_document(
title="", semantic_id="", sections=[Section(text="", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 0


def test_filter_documents_empty_title_with_content() -> None:
doc = create_test_document(
title="", sections=[Section(text="Valid content", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 1
assert result[0].id == "test_id"


def test_filter_documents_empty_content_with_title() -> None:
doc = create_test_document(
title="Valid Title", sections=[Section(text="", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 1
assert result[0].id == "test_id"


def test_filter_documents_exceeding_max_chars() -> None:
if not MAX_DOCUMENT_CHARS: # Skip if no max chars configured
return
long_text = "a" * (MAX_DOCUMENT_CHARS + 1)
doc = create_test_document(sections=[Section(text=long_text, link="test_link")])
result = filter_documents([doc])
assert len(result) == 0


def test_filter_documents_valid_document() -> None:
doc = create_test_document(
title="Valid Title", sections=[Section(text="Valid content", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 1
assert result[0].id == "test_id"
assert result[0].title == "Valid Title"


def test_filter_documents_whitespace_only() -> None:
doc = create_test_document(
title=" ", semantic_id=" ", sections=[Section(text=" ", link="test_link")]
)
result = filter_documents([doc])
assert len(result) == 0


def test_filter_documents_semantic_id_no_title() -> None:
doc = create_test_document(
title=None,
semantic_id="Valid Semantic ID",
sections=[Section(text="Valid content", link="test_link")],
)
result = filter_documents([doc])
assert len(result) == 1
assert result[0].semantic_identifier == "Valid Semantic ID"


def test_filter_documents_multiple_sections() -> None:
doc = create_test_document(
sections=[
Section(text="Content 1", link="test_link"),
Section(text="Content 2", link="test_link"),
Section(text="Content 3", link="test_link"),
]
)
result = filter_documents([doc])
assert len(result) == 1
assert len(result[0].sections) == 3


def test_filter_documents_multiple_documents() -> None:
docs = [
create_test_document(doc_id="1", title="Title 1"),
create_test_document(
doc_id="2", title="", sections=[Section(text="", link="test_link")]
), # Should be filtered
create_test_document(doc_id="3", title="Title 3"),
]
result = filter_documents(docs)
assert len(result) == 2
assert {doc.id for doc in result} == {"1", "3"}


def test_filter_documents_empty_batch() -> None:
result = filter_documents([])
assert len(result) == 0
2 changes: 2 additions & 0 deletions deployment/docker_compose/docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ services:
- GONG_CONNECTOR_START_TIME=${GONG_CONNECTOR_START_TIME:-}
- NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP=${NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP:-}
- GITHUB_CONNECTOR_BASE_URL=${GITHUB_CONNECTOR_BASE_URL:-}
- MAX_DOCUMENT_CHARS=${MAX_DOCUMENT_CHARS:-}
- MAX_FILE_SIZE_BYTES=${MAX_FILE_SIZE_BYTES:-}
# Celery Configs (defaults are set in the supervisord.conf file.
# prefer doing that to have one source of defaults)
- CELERY_WORKER_INDEXING_CONCURRENCY=${CELERY_WORKER_INDEXING_CONCURRENCY:-}
Expand Down
2 changes: 2 additions & 0 deletions deployment/kubernetes/env-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ data:
WEB_CONNECTOR_VALIDATE_URLS: ""
GONG_CONNECTOR_START_TIME: ""
NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP: ""
MAX_DOCUMENT_CHARS: ""
MAX_FILE_SIZE_BYTES: ""
# DanswerBot SlackBot Configs
DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER: ""
DANSWER_BOT_DISPLAY_ERROR_MSGS: ""
Expand Down
Loading