diff --git a/nesis/api/core/config.py b/nesis/api/core/config.py index f80a4cc..95108df 100644 --- a/nesis/api/core/config.py +++ b/nesis/api/core/config.py @@ -29,7 +29,7 @@ "memcache": { "hosts": [os.environ.get("NESIS_MEMCACHE_HOSTS", "127.0.0.1:11211")], "session": { - "expiry": os.environ.get("NESIS_API_MEMCACHE_SESSION_EXPIRY") or 1800 + "expiry": os.environ.get("NESIS_API_MEMCACHE_SESSION_EXPIRY") or 21600 }, "cache": { "timeout_default": 300, diff --git a/nesis/api/core/document_loaders/minio.py b/nesis/api/core/document_loaders/minio.py index 83572a6..30b1b43 100644 --- a/nesis/api/core/document_loaders/minio.py +++ b/nesis/api/core/document_loaders/minio.py @@ -106,7 +106,6 @@ def _sync_documents( ) -> None: try: - connection = datasource.connection # Data objects allow us to specify bucket names bucket_names = connection.get("dataobjects") @@ -213,11 +212,14 @@ def _sync_document( ) except ValueError: _LOG.warning(f"File {file_path} ingestion failed", exc_info=True) - response_json = {} + response_json = None except UserWarning: _LOG.debug(f"File {file_path} is already processing") return + if response_json is None: + return + _ingest_runner.save( document_id=item.etag, datasource_id=datasource.uuid, diff --git a/nesis/api/core/document_loaders/runners.py b/nesis/api/core/document_loaders/runners.py index 01e4167..baf408c 100644 --- a/nesis/api/core/document_loaders/runners.py +++ b/nesis/api/core/document_loaders/runners.py @@ -102,15 +102,17 @@ def _is_modified( """ documents = self._extraction_store.get(document_id=document_id) for document in documents: - if document is None or document.last_modified < last_modified: - return False - try: - self.delete(document=document) - except: - _LOG.warning( - f"Failed to delete document {document_id}'s record. Continuing anyway..." - ) - return True + if document is not None and last_modified.replace( + microsecond=0 + ) > document.last_modified.replace(microsecond=0): + try: + self.delete(document=document) + except: + _LOG.warning( + f"Failed to delete document {document_id}'s record. Continuing anyway..." + ) + return True + return False def save(self, **kwargs): return self._extraction_store.save( @@ -181,25 +183,28 @@ def _is_modified( document: Document = get_document(document_id=document_id) if document is None or document.base_uri != endpoint: return False - elif document.base_uri == endpoint: - store_metadata = document.store_metadata - if store_metadata and store_metadata.get("last_modified"): - if ( - not strptime(date_string=store_metadata["last_modified"]).replace( - tzinfo=None - ) - < last_modified - ): - return False - try: - self.delete(document=document) - except: - _LOG.warning( - f"Failed to delete document {document_id}'s record. Continuing anyway..." - ) - return True - else: - return None + store_metadata = document.store_metadata + document_last_modified = document.last_modified + if ( + document_last_modified is None + and store_metadata is not None + and store_metadata.get("last_modified") + ): + document_last_modified = strptime( + date_string=store_metadata["last_modified"] + ).replace(tzinfo=None) + if document_last_modified is not None and last_modified.replace( + microsecond=0 + ) > document_last_modified.replace(microsecond=0): + try: + self.delete(document=document, rag_metadata=document.rag_metadata) + except: + _LOG.warning( + f"Failed to delete document {document_id}'s record. Continuing anyway...", + exc_info=True, + ) + return True + return False def save(self, **kwargs) -> Document: return save_document( @@ -209,6 +214,7 @@ def save(self, **kwargs) -> Document: rag_metadata=kwargs["rag_metadata"], store_metadata=kwargs["store_metadata"], last_modified=kwargs["last_modified"], + datasource_id=kwargs["datasource_id"], ) def delete(self, document: Document, **kwargs) -> None: @@ -222,10 +228,11 @@ def delete(self, document: Document, **kwargs) -> None: for document_data in rag_metadata.get("data") or [] ] ) - _LOG.info(f"Deleting document {document.filename}") - delete_document(document_id=document.id) except: _LOG.warning( f"Failed to delete document {document.filename}", exc_info=True, ) + + _LOG.info(f"Deleting document {document.filename}") + delete_document(document_id=document.id) diff --git a/nesis/api/core/document_loaders/s3.py b/nesis/api/core/document_loaders/s3.py index 66fa10f..7d6f274 100644 --- a/nesis/api/core/document_loaders/s3.py +++ b/nesis/api/core/document_loaders/s3.py @@ -8,7 +8,7 @@ import memcache import nesis.api.core.util.http as http -from nesis.api.core.models.entities import Document +from nesis.api.core.models.entities import Document, Datasource from nesis.api.core.services import util from nesis.api.core.services.util import ( save_document, @@ -24,13 +24,14 @@ def fetch_documents( - connection: Dict[str, str], + datasource: Datasource, rag_endpoint: str, http_client: http.HttpClient, cache_client: memcache.Client, metadata: Dict[str, Any], ) -> None: try: + connection = datasource.connection endpoint = connection.get("endpoint") access_key = connection.get("user") secret_key = connection.get("password") @@ -61,7 +62,7 @@ def fetch_documents( _sync_documents( client=s3_client, - connection=connection, + datasource=datasource, rag_endpoint=rag_endpoint, http_client=http_client, cache_client=cache_client, @@ -79,7 +80,7 @@ def fetch_documents( def _sync_documents( client, - connection: dict, + datasource: Datasource, rag_endpoint: str, http_client: http.HttpClient, cache_client: memcache.Client, @@ -89,6 +90,7 @@ def _sync_documents( try: # Data objects allow us to specify bucket names + connection = datasource.connection bucket_paths = connection.get("dataobjects") if bucket_paths is None: _LOG.warning("No bucket names supplied, so I can't do much") @@ -138,7 +140,7 @@ def _sync_documents( try: _sync_document( client=client, - connection=connection, + datasource=datasource, rag_endpoint=rag_endpoint, http_client=http_client, metadata=_metadata, @@ -158,13 +160,14 @@ def _sync_documents( def _sync_document( client, - connection: dict, + datasource: Datasource, rag_endpoint: str, http_client: http.HttpClient, metadata: dict, bucket_name: str, item, ): + connection = datasource.connection endpoint = connection["endpoint"] _metadata = metadata @@ -235,6 +238,7 @@ def _sync_document( save_document( document_id=item["ETag"], filename=item["Key"], + datasource_id=datasource.uuid, base_uri=endpoint, rag_metadata=response_json, store_metadata={ diff --git a/nesis/api/core/document_loaders/sharepoint.py b/nesis/api/core/document_loaders/sharepoint.py index 3c715df..1ab496a 100644 --- a/nesis/api/core/document_loaders/sharepoint.py +++ b/nesis/api/core/document_loaders/sharepoint.py @@ -12,7 +12,7 @@ from nesis.api.core.util import http, clean_control, isblank import logging -from nesis.api.core.models.entities import Document +from nesis.api.core.models.entities import Document, Datasource from nesis.api.core.services.util import ( save_document, get_document, @@ -29,7 +29,7 @@ def fetch_documents( - connection: Dict[str, str], + datasource: Datasource, rag_endpoint: str, http_client: http.HttpClient, metadata: Dict[str, Any], @@ -37,6 +37,7 @@ def fetch_documents( ) -> None: try: + connection = datasource.connection site_url = connection.get("endpoint") client_id = connection.get("client_id") tenant = connection.get("tenant_id") @@ -55,7 +56,7 @@ def fetch_documents( _sync_sharepoint_documents( sp_context=_sharepoint_context, - connection=connection, + datasource=datasource, rag_endpoint=rag_endpoint, http_client=http_client, metadata=metadata, @@ -72,7 +73,7 @@ def fetch_documents( def _sync_sharepoint_documents( - sp_context, connection, rag_endpoint, http_client, metadata, cache_client + sp_context, datasource, rag_endpoint, http_client, metadata, cache_client ): try: _LOG.info(f"Initializing sharepoint syncing to endpoint {rag_endpoint}") @@ -83,6 +84,7 @@ def _sync_sharepoint_documents( ) # Data objects allow us to specify folder names + connection = datasource.connection sharepoint_folders = connection.get("dataobjects") if sharepoint_folders is None: _LOG.warning("Sharepoint folders are specified, so I can't do much") @@ -102,7 +104,7 @@ def _sync_sharepoint_documents( _process_folder_files( sharepoint_folder, - connection=connection, + datasource=datasource, rag_endpoint=rag_endpoint, http_client=http_client, metadata=metadata, @@ -130,7 +132,10 @@ def _sync_sharepoint_documents( ) -def _process_file(file, connection, rag_endpoint, http_client, metadata, cache_client): +def _process_file( + file, datasource: Datasource, rag_endpoint, http_client, metadata, cache_client +): + connection = datasource.connection site_url = connection.get("endpoint") parsed_site_url = urlparse(site_url) site_root_url = "{uri.scheme}://{uri.netloc}".format(uri=parsed_site_url) @@ -149,7 +154,7 @@ def _process_file(file, connection, rag_endpoint, http_client, metadata, cache_c if cache_client.add(key=_lock_key, val=_lock_key, time=30 * 60): try: _sync_document( - connection=connection, + datasource=datasource, rag_endpoint=rag_endpoint, http_client=http_client, metadata=_metadata, @@ -162,14 +167,14 @@ def _process_file(file, connection, rag_endpoint, http_client, metadata, cache_c def _process_folder_files( - folder, connection, rag_endpoint, http_client, metadata, cache_client + folder, datasource, rag_endpoint, http_client, metadata, cache_client ): # process files in folder _files = folder.get_files(False).execute_query() for file in _files: _process_file( file=file, - connection=connection, + datasource=datasource, rag_endpoint=rag_endpoint, http_client=http_client, metadata=metadata, @@ -178,12 +183,13 @@ def _process_folder_files( def _sync_document( - connection: dict, + datasource: Datasource, rag_endpoint: str, http_client: http.HttpClient, metadata: dict, file, ): + connection = datasource.connection site_url = connection["endpoint"] _metadata = metadata @@ -260,6 +266,7 @@ def _sync_document( filename=file.serverRelativeUrl, base_uri=site_url, rag_metadata=response_json, + datasource_id=datasource.uuid, store_metadata={ "file_name": file.name, "file_url": file.serverRelativeUrl, @@ -274,7 +281,9 @@ def _sync_document( ) _LOG.info(f"Done syncing object {file.name} in at {file.serverRelativeUrl}") except Exception as ex: - _LOG.warning(f"Error when getting and ingesting file {file.name} - {ex}") + _LOG.warning( + f"Error when getting and ingesting file {file.name}", exc_info=True + ) def _unsync_sharepoint_documents(sp_context, http_client, rag_endpoint, connection): diff --git a/nesis/api/core/services/util.py b/nesis/api/core/services/util.py index 743c321..566aa93 100644 --- a/nesis/api/core/services/util.py +++ b/nesis/api/core/services/util.py @@ -73,6 +73,7 @@ def save_document(**kwargs) -> Document: store_metadata=kwargs["store_metadata"], base_uri=kwargs["base_uri"], last_modified=kwargs["last_modified"], + datasource_id=kwargs["datasource_id"], ) session = kwargs.get("session") diff --git a/nesis/api/core/tasks/document_management.py b/nesis/api/core/tasks/document_management.py index 72681d5..890babd 100644 --- a/nesis/api/core/tasks/document_management.py +++ b/nesis/api/core/tasks/document_management.py @@ -52,19 +52,12 @@ def ingest_datasource(**kwargs) -> None: case DatasourceType.SHAREPOINT: sharepoint.fetch_documents( - connection=datasource.connection, + datasource=datasource, rag_endpoint=rag_endpoint, http_client=http_client, cache_client=cache_client, metadata={"datasource": datasource.name}, ) - case DatasourceType.GOOGLE_DRIVE: - google_drive.fetch_documents( - connection=datasource.connection, - rag_endpoint=rag_endpoint, - http_client=http_client, - metadata={"datasource": datasource.name}, - ) case DatasourceType.WINDOWS_SHARE: samba.fetch_documents( connection=datasource.connection, @@ -75,7 +68,7 @@ def ingest_datasource(**kwargs) -> None: ) case DatasourceType.S3: s3.fetch_documents( - connection=datasource.connection, + datasource=datasource, rag_endpoint=rag_endpoint, http_client=http_client, metadata={"datasource": datasource.name}, diff --git a/nesis/api/core/util/http.py b/nesis/api/core/util/http.py index b974a1e..c8b6388 100644 --- a/nesis/api/core/util/http.py +++ b/nesis/api/core/util/http.py @@ -12,6 +12,7 @@ import logging from nesis.api.core.util import clean_control +from nesis.api.core.util.concurrency import IOBoundPool class HttpClient(object): @@ -23,13 +24,6 @@ def __init__(self, config): self._config = config self._cache = memcache.Client(config["memcache"]["hosts"], debug=1) self._LOG = logging.getLogger(self.__module__ + "." + self.__class__.__name__) - try: - max_workers = self._config["http"]["workers"]["count"] - except KeyError: - max_workers = os.environ.get("NESIS_API_HTTP_WORKERS_COUNT", 10) - self._worker_executor_pool = concurrent.futures.ThreadPoolExecutor( - max_workers=max_workers - ) def get(self, url, params=None, headers=None, cookies=None, auth=None) -> str: with req.session() as session: @@ -68,7 +62,7 @@ def deletes(self, urls, params=None, headers=None, cookies=None) -> None: futures = [] for url in urls: futures.append( - self._worker_executor_pool.submit( + IOBoundPool.submit( self.delete, url=url, params=params, @@ -153,7 +147,3 @@ def upload(self, url, filepath, field, metadata: dict) -> Union[None, str]: self._cache.delete(_lock_key) else: raise UserWarning(f"File {filepath} is already processing") - - def __del__(self) -> None: - logging.info("Closing the worker pool") - self._worker_executor_pool.shutdown(wait=True) diff --git a/nesis/api/tests/core/document_loaders/test_minio.py b/nesis/api/tests/core/document_loaders/test_minio.py index 3f09b3f..8148d71 100644 --- a/nesis/api/tests/core/document_loaders/test_minio.py +++ b/nesis/api/tests/core/document_loaders/test_minio.py @@ -22,6 +22,7 @@ DatasourceType, DatasourceStatus, ) +from nesis.api.core.util.dateutil import strptime @pytest.fixture @@ -53,8 +54,8 @@ def test_ingest_documents( minio_instance: mock.MagicMock, cache: mock.MagicMock, session: Session ) -> None: data = { - "name": "s3 documents", - "engine": "s3", + "name": "minio documents", + "engine": "minio", "connection": { "endpoint": "https://s3.endpoint", "access_key": "", @@ -274,10 +275,6 @@ def test_uningest_documents( datasource=datasource, ) - # # No document records exist - # document_records = session.query(Document).all() - # assert 0 == len(document_records) - minio_ingestor.run( metadata={"datasource": "documents"}, ) @@ -373,3 +370,115 @@ def test_unextract_documents( minio_ingestor._extract_runner._extraction_store.Store ).all() assert len(documents) == initial_count - 1 + + +@mock.patch("nesis.api.core.document_loaders.minio.Minio") +def test_update_ingest_documents( + client: mock.MagicMock, cache: mock.MagicMock, session: Session +) -> None: + """ + Test updating documents if they have been updated at the minio bucket end. + """ + + data = { + "name": "s3 documents", + "engine": "minio", + "connection": { + "endpoint": "http://localhost:4566", + "region": "us-east-1", + "dataobjects": "my-test-bucket", + }, + } + + datasource = Datasource( + name=data["name"], + connection=data["connection"], + source_type=DatasourceType.MINIO, + status=DatasourceStatus.ONLINE, + ) + + session.add(datasource) + session.commit() + + # The document record + + document = Document( + base_uri="http://localhost:4566", + document_id="d41d8cd98f00b204e9800998ecf8427e", + filename="invalid.pdf", + rag_metadata={"data": [{"doc_id": str(uuid.uuid4())}]}, + store_metadata={ + "bucket_name": "some-bucket", + "object_name": "file/path.pdf", + "last_modified": "2023-07-18 06:40:07", + }, + last_modified=strptime("2023-07-19 06:40:07"), + datasource_id=datasource.uuid, + ) + + session.add(document) + session.commit() + + http_client = mock.MagicMock() + http_client.upload.return_value = json.dumps({}) + minio_client = mock.MagicMock() + bucket = mock.MagicMock() + + client.return_value = minio_client + type(bucket).etag = mock.PropertyMock( + return_value="d41d8cd98f00b204e9800998ecf8427e" + ) + last_modified = datetime.datetime.now() + type(bucket).bucket_name = mock.PropertyMock(return_value="SomeName") + type(bucket).object_name = mock.PropertyMock(return_value="SomeName") + type(bucket).last_modified = mock.PropertyMock(return_value=last_modified) + type(bucket).size = mock.PropertyMock(return_value=1000) + type(bucket).version_id = mock.PropertyMock(return_value="2") + + minio_client.list_objects.return_value = [bucket] + + minio_ingestor = minio.MinioProcessor( + config=tests.config, + http_client=http_client, + cache_client=cache, + datasource=datasource, + ) + + minio_ingestor.run( + metadata={"datasource": "documents"}, + ) + + # The document would be deleted from the rag engine + _, upload_kwargs = http_client.deletes.call_args_list[0] + urls = upload_kwargs["urls"] + + assert ( + urls[0] + == f"http://localhost:8080/v1/ingest/documents/{document.rag_metadata['data'][0]['doc_id']}" + ) + documents = session.query(Document).all() + assert len(documents) == 1 + assert document.id != documents[0].id + + # And then re-ingested + _, upload_kwargs = http_client.upload.call_args_list[0] + url = upload_kwargs["url"] + file_path = upload_kwargs["filepath"] + metadata = upload_kwargs["metadata"] + field = upload_kwargs["field"] + + assert url == f"http://localhost:8080/v1/ingest/files" + assert field == "file" + ut.TestCase().assertDictEqual( + metadata, + { + "datasource": "documents", + "file_name": "my-test-bucket/SomeName", + "self_link": "http://localhost:4566/my-test-bucket/SomeName", + }, + ) + + # The document has now been updated + documents = session.query(Document).all() + assert len(documents) == 1 + assert documents[0].last_modified == last_modified diff --git a/nesis/api/tests/core/document_loaders/test_s3.py b/nesis/api/tests/core/document_loaders/test_s3.py index 68c59cd..d95f50d 100644 --- a/nesis/api/tests/core/document_loaders/test_s3.py +++ b/nesis/api/tests/core/document_loaders/test_s3.py @@ -108,7 +108,7 @@ def test_sync_documents( s3_client.get_paginator.return_value = paginator s3.fetch_documents( - connection=data["connection"], + datasource=datasource, http_client=http_client, metadata={"datasource": "documents"}, rag_endpoint="http://localhost:8080", @@ -154,6 +154,16 @@ def test_update_sync_documents( }, } + datasource = Datasource( + name=data["name"], + connection=data["connection"], + source_type=DatasourceType.SHAREPOINT, + status=DatasourceStatus.ONLINE, + ) + + session.add(datasource) + session.commit() + document = Document( base_uri="http://localhost:4566", document_id="d41d8cd98f00b204e9800998ecf8427e", @@ -197,7 +207,7 @@ def test_update_sync_documents( s3_client.get_paginator.return_value = paginator s3.fetch_documents( - connection=data["connection"], + datasource=datasource, http_client=http_client, metadata={"datasource": "documents"}, rag_endpoint="http://localhost:8080", @@ -257,8 +267,19 @@ def test_unsync_s3_documents( }, } + datasource = Datasource( + name=data["name"], + connection=data["connection"], + source_type=DatasourceType.SHAREPOINT, + status=DatasourceStatus.ONLINE, + ) + + session.add(datasource) + session.commit() + document = Document( base_uri="http://localhost:4566", + datasource_id=datasource.uuid, document_id=str(uuid.uuid4()), filename="invalid.pdf", rag_metadata={"data": [{"doc_id": str(uuid.uuid4())}]}, @@ -279,7 +300,7 @@ def test_unsync_s3_documents( assert len(documents) == 1 s3.fetch_documents( - connection=data["connection"], + datasource=datasource, http_client=http_client, metadata={"datasource": "documents"}, rag_endpoint="http://localhost:8080", diff --git a/nesis/api/tests/core/document_loaders/test_sharepoint.py b/nesis/api/tests/core/document_loaders/test_sharepoint.py index 27ac994..5ad3d91 100644 --- a/nesis/api/tests/core/document_loaders/test_sharepoint.py +++ b/nesis/api/tests/core/document_loaders/test_sharepoint.py @@ -82,7 +82,7 @@ def test_sync_sharepoint_documents( http_client.upload.return_value = json.dumps({}) sharepoint.fetch_documents( - connection=data["connection"], + datasource=datasource, http_client=http_client, metadata={"datasource": "documents"}, rag_endpoint="http://localhost:8080", @@ -129,7 +129,18 @@ def test_sync_updated_sharepoint_documents( }, } + datasource = Datasource( + name=data["name"], + connection=data["connection"], + source_type=DatasourceType.MINIO, + status=DatasourceStatus.ONLINE, + ) + + session.add(datasource) + session.commit() + document = Document( + datasource_id=datasource.uuid, base_uri="https://ametnes.sharepoint.com/sites/nesis-test/", document_id="edu323-23423-23frs-234232", filename="sharepoint_file.pdf", @@ -185,7 +196,7 @@ def test_sync_updated_sharepoint_documents( http_client.upload.return_value = json.dumps({}) sharepoint.fetch_documents( - connection=data["connection"], + datasource=datasource, http_client=http_client, metadata={"datasource": "documents"}, rag_endpoint="http://localhost:8080", @@ -246,8 +257,19 @@ def test_unsync_sharepoint_documents( }, } + datasource = Datasource( + name=data["name"], + connection=data["connection"], + source_type=DatasourceType.SHAREPOINT, + status=DatasourceStatus.ONLINE, + ) + + session.add(datasource) + session.commit() + document = Document( base_uri="https://ametnes.sharepoint.com/sites/nesis-test/", + datasource_id=datasource.uuid, document_id="edu323-23423-23frs", filename="Test_file.pdf", rag_metadata={"data": [{"doc_id": str(uuid.uuid4())}]}, @@ -297,7 +319,7 @@ def test_unsync_sharepoint_documents( assert len(documents) == 1 sharepoint.fetch_documents( - connection=data["connection"], + datasource=datasource, http_client=http_client, metadata={"datasource": "documents"}, rag_endpoint="http://localhost:8080", diff --git a/nesis/api/tests/tasks/test_document_management.py b/nesis/api/tests/tasks/test_document_management.py index 9d36e55..c8bf7a6 100644 --- a/nesis/api/tests/tasks/test_document_management.py +++ b/nesis/api/tests/tasks/test_document_management.py @@ -197,7 +197,8 @@ def test_ingest_datasource_s3( assert ( kwargs_sync_samba_documents["rag_endpoint"] == tests.config["rag"]["endpoint"] ) - tc.assertDictEqual(kwargs_sync_samba_documents["connection"], datasource.connection) + assert kwargs_sync_samba_documents.get("datasource") is not None + tc.assertDictEqual( kwargs_sync_samba_documents["metadata"], {"datasource": datasource.name} ) @@ -249,9 +250,8 @@ def test_ingest_datasource_sharepoint( kwargs_sync_sharepoint_documents["rag_endpoint"] == tests.config["rag"]["endpoint"] ) - tc.assertDictEqual( - kwargs_sync_sharepoint_documents["connection"], datasource.connection - ) + assert kwargs_sync_sharepoint_documents.get("datasource") is not None + tc.assertDictEqual( kwargs_sync_sharepoint_documents["metadata"], {"datasource": datasource.name} ) diff --git a/nesis/rag/settings.yaml b/nesis/rag/settings.yaml index 1c91b70..d9ea12b 100644 --- a/nesis/rag/settings.yaml +++ b/nesis/rag/settings.yaml @@ -22,7 +22,7 @@ embedding: vectorstore: database: pgvector - similarity_top_k: ${NESIS_RAG_VECTORSTORE_SIMILARITY_TOP_K:5} + similarity_top_k: ${NESIS_RAG_VECTORSTORE_SIMILARITY_TOP_K:10} pgvector: url: ${NESIS_RAG_PGVECTOR_URL:postgresql://postgres:password@localhost:65432/nesis} diff --git a/version.txt b/version.txt index 12023bf..b1e80bb 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -0.1.3-rc11 +0.1.3