Skip to content

Commit

Permalink
fix(api): refactor document loaders for generic ingestion and extract…
Browse files Browse the repository at this point in the history
…ion (#142)

This PR refactors all minio, sharepoint, samba, s3 document loaders
- to use generic document loaders.
- to have unique document_ids generated from from a combination of datasource and self_reference

Closes #143
  • Loading branch information
mawandm authored Aug 21, 2024
1 parent d6135df commit 399bfb1
Show file tree
Hide file tree
Showing 12 changed files with 1,216 additions and 1,058 deletions.
125 changes: 117 additions & 8 deletions nesis/api/core/document_loaders/loader_helper.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
import uuid
import datetime
import json
from nesis.api.core.models.entities import Document
from nesis.api.core.util.dateutil import strptime
import logging
from nesis.api.core.services.util import get_document, delete_document
import uuid
from typing import Optional, Dict, Any, Callable

import nesis.api.core.util.http as http
from nesis.api.core.document_loaders.runners import (
IngestRunner,
ExtractRunner,
RagRunner,
)
from nesis.api.core.models.entities import Document, Datasource
from nesis.api.core.services.util import delete_document
from nesis.api.core.services.util import (
get_document,
)
from nesis.api.core.util.constants import DEFAULT_DATETIME_FORMAT
from nesis.api.core.util.dateutil import strptime

_LOG = logging.getLogger(__name__)


def upload_document_to_llm(upload_document, file_metadata, rag_endpoint, http_client):
return _upload_document_to_pgpt(
upload_document, file_metadata, rag_endpoint, http_client
)
return _upload_document(upload_document, file_metadata, rag_endpoint, http_client)


def _upload_document_to_pgpt(upload_document, file_metadata, rag_endpoint, http_client):
def _upload_document(upload_document, file_metadata, rag_endpoint, http_client):
document_id = file_metadata["unique_id"]
file_name = file_metadata["name"]

Expand Down Expand Up @@ -51,3 +62,101 @@ def _upload_document_to_pgpt(upload_document, file_metadata, rag_endpoint, http_
request_object = {"file_name": file_name, "text": upload_document.page_content}
response = http_client.post(url=f"{rag_endpoint}/v1/ingest", payload=request_object)
return json.loads(response)


class DocumentProcessor(object):
def __init__(
self,
config,
http_client: http.HttpClient,
datasource: Datasource,
):
self._datasource = datasource

# This is left package public for testing
self._extract_runner: ExtractRunner = Optional[None]
_ingest_runner = IngestRunner(config=config, http_client=http_client)

if self._datasource.connection.get("destination") is not None:
self._extract_runner = ExtractRunner(
config=config,
http_client=http_client,
destination=self._datasource.connection.get("destination"),
)

self._mode = self._datasource.connection.get("mode") or "ingest"

match self._mode:
case "ingest":
self._ingest_runners: list[RagRunner] = [_ingest_runner]
case "extract":
self._ingest_runners: list[RagRunner] = [self._extract_runner]
case _:
raise ValueError(
f"Invalid mode {self._mode}. Expected 'ingest' or 'extract'"
)

def sync(
self,
endpoint: str,
file_path: str,
last_modified: datetime.datetime,
metadata: Dict[str, Any],
store_metadata: Dict[str, Any],
) -> None:
"""
Here we check if this file has been updated.
If the file has been updated, we delete it from the vector store and re-ingest the new updated file
"""
document_id = str(
uuid.uuid5(
uuid.NAMESPACE_DNS, f"{self._datasource.uuid}:{metadata['self_link']}"
)
)
document: Document = get_document(document_id=document_id)
for _ingest_runner in self._ingest_runners:
try:
response_json = _ingest_runner.run(
file_path=file_path,
metadata=metadata,
document_id=None if document is None else document.uuid,
last_modified=last_modified.replace(tzinfo=None).replace(
microsecond=0
),
datasource=self._datasource,
)
except ValueError:
_LOG.warning(f"File {file_path} ingestion failed", exc_info=True)
response_json = None
except UserWarning:
_LOG.warning(f"File {file_path} is already processing")
continue

if response_json is None:
_LOG.warning("No response from ingest runner received")
continue

_ingest_runner.save(
document_id=document_id,
datasource_id=self._datasource.uuid,
filename=store_metadata["filename"],
base_uri=endpoint,
rag_metadata=response_json,
store_metadata=store_metadata,
last_modified=last_modified,
)

def unsync(self, clean: Callable) -> None:
endpoint = self._datasource.connection.get("endpoint")

for _ingest_runner in self._ingest_runners:
documents = _ingest_runner.get(base_uri=endpoint)
for document in documents:
store_metadata = document.store_metadata
try:
rag_metadata = document.rag_metadata
except AttributeError:
rag_metadata = document.extract_metadata

if clean(store_metadata=store_metadata):
_ingest_runner.delete(document=document, rag_metadata=rag_metadata)
156 changes: 39 additions & 117 deletions nesis/api/core/document_loaders/minio.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,39 @@
import concurrent
import concurrent.futures
import logging
import multiprocessing
import logging
import os
import queue
import tempfile
from typing import Dict, Any, Optional
from typing import Dict, Any

import memcache
import minio
from minio import Minio

import nesis.api.core.util.http as http
from nesis.api.core.document_loaders.runners import (
IngestRunner,
ExtractRunner,
RagRunner,
)
from nesis.api.core.models.entities import Document, Datasource
from nesis.api.core.services.util import (
get_document,
get_documents,
)
from nesis.api.core.document_loaders.loader_helper import DocumentProcessor
from nesis.api.core.models.entities import Datasource
from nesis.api.core.util import clean_control, isblank
from nesis.api.core.util.concurrency import (
IOBoundPool,
as_completed,
BlockingThreadPoolExecutor,
)
from nesis.api.core.util.constants import DEFAULT_DATETIME_FORMAT

_LOG = logging.getLogger(__name__)


class MinioProcessor(object):
class MinioProcessor(DocumentProcessor):
def __init__(
self,
config,
http_client: http.HttpClient,
cache_client: memcache.Client,
datasource: Datasource,
):
super().__init__(config, http_client, datasource)
self._config = config
self._http_client = http_client
self._cache_client = cache_client
self._datasource = datasource

# This is left public for testing
self._extract_runner: ExtractRunner = Optional[None]
_ingest_runner = IngestRunner(config=config, http_client=http_client)
if self._datasource.connection.get("destination") is not None:
self._extract_runner = ExtractRunner(
config=config,
http_client=http_client,
destination=self._datasource.connection.get("destination"),
)
self._ingest_runners = []

self._ingest_runners = [IngestRunner(config=config, http_client=http_client)]

self._mode = self._datasource.connection.get("mode") or "ingest"

match self._mode:
case "ingest":
self._ingest_runners: list[RagRunner] = [_ingest_runner]
case "extract":
self._ingest_runners: list[RagRunner] = [self._extract_runner]
case _:
raise ValueError(
f"Invalid mode {self._mode}. Expected 'ingest' or 'extract'"
)

def run(self, metadata: Dict[str, Any]):
connection: Dict[str, str] = self._datasource.connection
try:
Expand All @@ -93,7 +56,6 @@ def run(self, metadata: Dict[str, Any]):
)
self._unsync_documents(
client=_minio_client,
connection=connection,
)
except:
_LOG.exception("Error fetching sharepoint documents")
Expand Down Expand Up @@ -192,52 +154,22 @@ def _sync_document(
file_path=file_path,
)

"""
Here we check if this file has been updated.
If the file has been updated, we delete it from the vector store and re-ingest the new updated file
"""
document: Document = get_document(document_id=item.etag)
document_id = None if document is None else document.uuid

for _ingest_runner in self._ingest_runners:
try:
response_json = _ingest_runner.run(
file_path=file_path,
metadata=metadata,
document_id=document_id,
last_modified=item.last_modified.replace(tzinfo=None).replace(
microsecond=0
),
datasource=datasource,
)
except ValueError:
_LOG.warning(f"File {file_path} ingestion failed", exc_info=True)
response_json = None
except UserWarning:
_LOG.debug(f"File {file_path} is already processing")
return

if response_json is None:
return

_ingest_runner.save(
document_id=item.etag,
datasource_id=datasource.uuid,
filename=item.object_name,
base_uri=endpoint,
rag_metadata=response_json,
store_metadata={
"bucket_name": item.bucket_name,
"object_name": item.object_name,
"etag": item.etag,
"size": item.size,
"last_modified": item.last_modified.strftime(
DEFAULT_DATETIME_FORMAT
),
"version_id": item.version_id,
},
last_modified=item.last_modified,
)
self.sync(
endpoint,
file_path,
item.last_modified,
metadata,
store_metadata={
"bucket_name": item.bucket_name,
"object_name": item.object_name,
"filename": item.object_name,
"size": item.size,
"last_modified": item.last_modified.strftime(
DEFAULT_DATETIME_FORMAT
),
"version_id": item.version_id,
},
)

_LOG.info(
f"Done {self._mode}ing object {item.object_name} in bucket {bucket_name}"
Expand All @@ -254,37 +186,27 @@ def _sync_document(
def _unsync_documents(
self,
client: Minio,
connection: dict,
) -> None:

try:
endpoint = connection.get("endpoint")

for _ingest_runner in self._ingest_runners:
documents = _ingest_runner.get(base_uri=endpoint)
for document in documents:
store_metadata = document.store_metadata
try:
rag_metadata = document.rag_metadata
except AttributeError:
rag_metadata = document.extract_metadata
bucket_name = store_metadata["bucket_name"]
object_name = store_metadata["object_name"]
try:
client.stat_object(
bucket_name=bucket_name, object_name=object_name
)
except Exception as ex:
str_ex = str(ex)
if "NoSuchKey" in str_ex and "does not exist" in str_ex:
_ingest_runner.delete(
document=document, rag_metadata=rag_metadata
)
else:
raise
def clean(**kwargs):
store_metadata = kwargs["store_metadata"]
try:
client.stat_object(
bucket_name=store_metadata["bucket_name"],
object_name=store_metadata["object_name"],
)
return False
except Exception as ex:
str_ex = str(ex)
if "NoSuchKey" in str_ex and "does not exist" in str_ex:
return True
else:
raise

try:
self.unsync(clean=clean)
except:
_LOG.warn("Error fetching and updating documents", exc_info=True)
_LOG.warning("Error fetching and updating documents", exc_info=True)


def validate_connection_info(connection: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
4 changes: 3 additions & 1 deletion nesis/api/core/document_loaders/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,16 @@ def run(
) -> Union[Dict[str, Any], None]:

if document_id is not None:
_LOG.debug(f"Checking if document {document_id} is modified")
_is_modified = self._is_modified(
document_id=document_id, last_modified=last_modified
)
if _is_modified is None or not _is_modified:
_LOG.debug(f"Document {document_id} is not modified")
return

url = f"{self._rag_endpoint}/v1/extractions/text"

_LOG.debug(f"Document {document_id} is modified, performing extraction")
response = self._http_client.upload(
url=url,
filepath=file_path,
Expand Down
Loading

0 comments on commit 399bfb1

Please sign in to comment.