Skip to content

Commit

Permalink
Refactor s3 document processor
Browse files Browse the repository at this point in the history
  • Loading branch information
mawandm committed Aug 16, 2024
1 parent f21996e commit d073e53
Show file tree
Hide file tree
Showing 5 changed files with 380 additions and 451 deletions.
125 changes: 117 additions & 8 deletions nesis/api/core/document_loaders/loader_helper.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
import uuid
import datetime
import json
from nesis.api.core.models.entities import Document
from nesis.api.core.util.dateutil import strptime
import logging
from nesis.api.core.services.util import get_document, delete_document
import uuid
from typing import Optional, Dict, Any, Callable

import nesis.api.core.util.http as http
from nesis.api.core.document_loaders.runners import (
IngestRunner,
ExtractRunner,
RagRunner,
)
from nesis.api.core.models.entities import Document, Datasource
from nesis.api.core.services.util import delete_document
from nesis.api.core.services.util import (
get_document,
)
from nesis.api.core.util.constants import DEFAULT_DATETIME_FORMAT
from nesis.api.core.util.dateutil import strptime

_LOG = logging.getLogger(__name__)


def upload_document_to_llm(upload_document, file_metadata, rag_endpoint, http_client):
return _upload_document_to_pgpt(
upload_document, file_metadata, rag_endpoint, http_client
)
return _upload_document(upload_document, file_metadata, rag_endpoint, http_client)


def _upload_document_to_pgpt(upload_document, file_metadata, rag_endpoint, http_client):
def _upload_document(upload_document, file_metadata, rag_endpoint, http_client):
document_id = file_metadata["unique_id"]
file_name = file_metadata["name"]

Expand Down Expand Up @@ -51,3 +62,101 @@ def _upload_document_to_pgpt(upload_document, file_metadata, rag_endpoint, http_
request_object = {"file_name": file_name, "text": upload_document.page_content}
response = http_client.post(url=f"{rag_endpoint}/v1/ingest", payload=request_object)
return json.loads(response)


class DocumentProcessor(object):
def __init__(
self,
config,
http_client: http.HttpClient,
datasource: Datasource,
):
self._datasource = datasource

# This is left package public for testing
self._extract_runner: ExtractRunner = Optional[None]
_ingest_runner = IngestRunner(config=config, http_client=http_client)

if self._datasource.connection.get("destination") is not None:
self._extract_runner = ExtractRunner(
config=config,
http_client=http_client,
destination=self._datasource.connection.get("destination"),
)

self._mode = self._datasource.connection.get("mode") or "ingest"

match self._mode:
case "ingest":
self._ingest_runners: list[RagRunner] = [_ingest_runner]
case "extract":
self._ingest_runners: list[RagRunner] = [self._extract_runner]
case _:
raise ValueError(
f"Invalid mode {self._mode}. Expected 'ingest' or 'extract'"
)

def sync(
self,
endpoint: str,
file_path: str,
last_modified: datetime.datetime,
metadata: Dict[str, Any],
store_metadata: Dict[str, Any],
) -> None:
"""
Here we check if this file has been updated.
If the file has been updated, we delete it from the vector store and re-ingest the new updated file
"""
document_id = str(
uuid.uuid5(
uuid.NAMESPACE_DNS, f"{self._datasource.uuid}/{metadata['self_link']}"
)
)
document: Document = get_document(document_id=document_id)
for _ingest_runner in self._ingest_runners:
try:
response_json = _ingest_runner.run(
file_path=file_path,
metadata=metadata,
document_id=None if document is None else document.uuid,
last_modified=last_modified.replace(tzinfo=None).replace(
microsecond=0
),
datasource=self._datasource,
)
except ValueError:
_LOG.warning(f"File {file_path} ingestion failed", exc_info=True)
response_json = None
except UserWarning:
_LOG.warning(f"File {file_path} is already processing")
continue

if response_json is None:
_LOG.warning("No response from ingest runner received")
continue

_ingest_runner.save(
document_id=document_id,
datasource_id=self._datasource.uuid,
filename=store_metadata["filename"],
base_uri=endpoint,
rag_metadata=response_json,
store_metadata=store_metadata,
last_modified=last_modified,
)

def unsync(self, clean: Callable) -> None:
endpoint = self._datasource.connection.get("endpoint")

for _ingest_runner in self._ingest_runners:
documents = _ingest_runner.get(base_uri=endpoint)
for document in documents:
store_metadata = document.store_metadata
try:
rag_metadata = document.rag_metadata
except AttributeError:
rag_metadata = document.extract_metadata

if clean(store_metadata=store_metadata):
_ingest_runner.delete(document=document, rag_metadata=rag_metadata)
194 changes: 35 additions & 159 deletions nesis/api/core/document_loaders/minio.py
Original file line number Diff line number Diff line change
@@ -1,136 +1,25 @@
import concurrent
import concurrent.futures
import logging
import multiprocessing
import logging
import os
import queue
import tempfile
import uuid
from typing import Dict, Any, Optional
from typing import Dict, Any

import memcache
import minio
from minio import Minio

import nesis.api.core.util.http as http
from nesis.api.core.document_loaders.runners import (
IngestRunner,
ExtractRunner,
RagRunner,
)
from nesis.api.core.models.entities import Document, Datasource
from nesis.api.core.services.util import (
get_document,
get_documents,
)
from nesis.api.core.document_loaders.loader_helper import DocumentProcessor
from nesis.api.core.models.entities import Datasource
from nesis.api.core.util import clean_control, isblank
from nesis.api.core.util.concurrency import (
IOBoundPool,
as_completed,
BlockingThreadPoolExecutor,
)
from nesis.api.core.util.constants import DEFAULT_DATETIME_FORMAT

_LOG = logging.getLogger(__name__)


class DocumentProcessor(object):
def __init__(
self,
config,
http_client: http.HttpClient,
datasource: Datasource,
):
self._datasource = datasource

# This is left package public for testing
self._extract_runner: ExtractRunner = Optional[None]
_ingest_runner = IngestRunner(config=config, http_client=http_client)

if self._datasource.connection.get("destination") is not None:
self._extract_runner = ExtractRunner(
config=config,
http_client=http_client,
destination=self._datasource.connection.get("destination"),
)

self._mode = self._datasource.connection.get("mode") or "ingest"

match self._mode:
case "ingest":
self._ingest_runners: list[RagRunner] = [_ingest_runner]
case "extract":
self._ingest_runners: list[RagRunner] = [self._extract_runner]
case _:
raise ValueError(
f"Invalid mode {self._mode}. Expected 'ingest' or 'extract'"
)

def sync(self, endpoint, file_path, item, metadata):
"""
Here we check if this file has been updated.
If the file has been updated, we delete it from the vector store and re-ingest the new updated file
"""
document_id = str(
uuid.uuid5(uuid.NAMESPACE_DNS, f"{self._datasource.uuid}/{item.etag}")
)
document: Document = get_document(document_id=document_id)
for _ingest_runner in self._ingest_runners:
try:
response_json = _ingest_runner.run(
file_path=file_path,
metadata=metadata,
document_id=None if document is None else document.uuid,
last_modified=item.last_modified.replace(tzinfo=None).replace(
microsecond=0
),
datasource=self._datasource,
)
except ValueError:
_LOG.warning(f"File {file_path} ingestion failed", exc_info=True)
response_json = None
except UserWarning:
_LOG.warning(f"File {file_path} is already processing")
continue

if response_json is None:
_LOG.warning("No response from ingest runner received")
continue

_ingest_runner.save(
document_id=document_id,
datasource_id=self._datasource.uuid,
filename=item.object_name,
base_uri=endpoint,
rag_metadata=response_json,
store_metadata={
"bucket_name": item.bucket_name,
"object_name": item.object_name,
"size": item.size,
"last_modified": item.last_modified.strftime(
DEFAULT_DATETIME_FORMAT
),
"version_id": item.version_id,
},
last_modified=item.last_modified,
)

def unsync(self, clean):
endpoint = self._datasource.connection.get("endpoint")

for _ingest_runner in self._ingest_runners:
documents = _ingest_runner.get(base_uri=endpoint)
for document in documents:
store_metadata = document.store_metadata
try:
rag_metadata = document.rag_metadata
except AttributeError:
rag_metadata = document.extract_metadata

if clean(store_metadata=store_metadata):
_ingest_runner.delete(document=document, rag_metadata=rag_metadata)


class MinioProcessor(DocumentProcessor):
def __init__(
self,
Expand Down Expand Up @@ -167,7 +56,6 @@ def run(self, metadata: Dict[str, Any]):
)
self._unsync_documents(
client=_minio_client,
connection=connection,
)
except:
_LOG.exception("Error fetching sharepoint documents")
Expand Down Expand Up @@ -266,7 +154,22 @@ def _sync_document(
file_path=file_path,
)

self.sync(endpoint, file_path, item, metadata)
self.sync(
endpoint,
file_path,
item.last_modified,
metadata,
store_metadata={
"bucket_name": item.bucket_name,
"object_name": item.object_name,
"filename": item.object_name,
"size": item.size,
"last_modified": item.last_modified.strftime(
DEFAULT_DATETIME_FORMAT
),
"version_id": item.version_id,
},
)

_LOG.info(
f"Done {self._mode}ing object {item.object_name} in bucket {bucket_name}"
Expand All @@ -283,52 +186,25 @@ def _sync_document(
def _unsync_documents(
self,
client: Minio,
connection: dict,
) -> None:

try:
# endpoint = connection.get("endpoint")
#
# for _ingest_runner in self._ingest_runners:
# documents = _ingest_runner.get(base_uri=endpoint)
# for document in documents:
# store_metadata = document.store_metadata
# try:
# rag_metadata = document.rag_metadata
# except AttributeError:
# rag_metadata = document.extract_metadata
# bucket_name = store_metadata["bucket_name"]
# object_name = store_metadata["object_name"]
# try:
# client.stat_object(
# bucket_name=bucket_name, object_name=object_name
# )
# except Exception as ex:
# str_ex = str(ex)
# if "NoSuchKey" in str_ex and "does not exist" in str_ex:
# _ingest_runner.delete(
# document=document, rag_metadata=rag_metadata
# )
# else:
# raise

def clean(**kwargs):
store_metadata = kwargs["store_metadata"]
try:
client.stat_object(
bucket_name=store_metadata["bucket_name"],
object_name=store_metadata["object_name"],
)
return False
except Exception as ex:
str_ex = str(ex)
if "NoSuchKey" in str_ex and "does not exist" in str_ex:
return True
else:
raise
def clean(**kwargs):
store_metadata = kwargs["store_metadata"]
try:
client.stat_object(
bucket_name=store_metadata["bucket_name"],
object_name=store_metadata["object_name"],
)
return False
except Exception as ex:
str_ex = str(ex)
if "NoSuchKey" in str_ex and "does not exist" in str_ex:
return True
else:
raise

try:
self.unsync(clean=clean)

except:
_LOG.warning("Error fetching and updating documents", exc_info=True)

Expand Down
Loading

0 comments on commit d073e53

Please sign in to comment.