From 7dd39d8433aa3226b23bbddec57bf28dcc62cf6a Mon Sep 17 00:00:00 2001 From: Michael Sekamanya Date: Fri, 16 Aug 2024 23:25:52 -0700 Subject: [PATCH] Refactor sharepoint --- nesis/api/core/document_loaders/sharepoint.py | 437 +++++++----------- nesis/api/core/tasks/document_management.py | 10 +- .../core/document_loaders/test_sharepoint.py | 59 ++- 3 files changed, 211 insertions(+), 295 deletions(-) diff --git a/nesis/api/core/document_loaders/sharepoint.py b/nesis/api/core/document_loaders/sharepoint.py index 1ab496a..94f768e 100644 --- a/nesis/api/core/document_loaders/sharepoint.py +++ b/nesis/api/core/document_loaders/sharepoint.py @@ -10,6 +10,7 @@ from office365.sharepoint.client_context import ClientContext from office365.runtime.client_request_exception import ClientRequestException +from nesis.api.core.document_loaders.loader_helper import DocumentProcessor from nesis.api.core.util import http, clean_control, isblank import logging from nesis.api.core.models.entities import Document, Datasource @@ -28,278 +29,195 @@ _LOG = logging.getLogger(__name__) -def fetch_documents( - datasource: Datasource, - rag_endpoint: str, - http_client: http.HttpClient, - metadata: Dict[str, Any], - cache_client: memcache.Client, -) -> None: - try: +class Processor(DocumentProcessor): + def __init__( + self, + config, + http_client: http.HttpClient, + cache_client: memcache.Client, + datasource: Datasource, + ): + super().__init__(config, http_client, datasource) + self._config = config + self._http_client = http_client + self._cache_client = cache_client + self._datasource = datasource + self._futures = [] - connection = datasource.connection - site_url = connection.get("endpoint") - client_id = connection.get("client_id") - tenant = connection.get("tenant_id") - thumbprint = connection.get("thumbprint") - - with tempfile.NamedTemporaryFile(dir=tempfile.gettempdir()) as tmp: - cert_path = f"{str(pathlib.Path(tmp.name).absolute())}-{uuid.uuid4()}.key" - pathlib.Path(cert_path).write_text(connection["certificate"]) - - _sharepoint_context = ClientContext(site_url).with_client_certificate( - tenant=tenant, - client_id=client_id, - thumbprint=thumbprint, - cert_path=cert_path, - ) + def run(self, metadata: Dict[str, Any]): - _sync_sharepoint_documents( - sp_context=_sharepoint_context, - datasource=datasource, - rag_endpoint=rag_endpoint, - http_client=http_client, - metadata=metadata, - cache_client=cache_client, - ) - _unsync_sharepoint_documents( - sp_context=_sharepoint_context, - connection=connection, - rag_endpoint=rag_endpoint, - http_client=http_client, - ) - except Exception as ex: - _LOG.exception(f"Error fetching sharepoint documents - {ex}") + try: + connection = self._datasource.connection + site_url = connection.get("endpoint") + client_id = connection.get("client_id") + tenant = connection.get("tenant_id") + thumbprint = connection.get("thumbprint") -def _sync_sharepoint_documents( - sp_context, datasource, rag_endpoint, http_client, metadata, cache_client -): - try: - _LOG.info(f"Initializing sharepoint syncing to endpoint {rag_endpoint}") + with tempfile.NamedTemporaryFile(dir=tempfile.gettempdir()) as tmp: + cert_path = ( + f"{str(pathlib.Path(tmp.name).absolute())}-{uuid.uuid4()}.key" + ) + pathlib.Path(cert_path).write_text(connection["certificate"]) - if sp_context is None: - raise Exception( - "Sharepoint context is null, cannot proceed with document processing." - ) + _sharepoint_context = ClientContext(site_url).with_client_certificate( + tenant=tenant, + client_id=client_id, + thumbprint=thumbprint, + cert_path=cert_path, + ) + + self._sync_sharepoint_documents( + sp_context=_sharepoint_context, + metadata=metadata, + ) + self._unsync_sharepoint_documents( + sp_context=_sharepoint_context, + ) + except Exception as ex: + _LOG.exception(f"Error fetching sharepoint documents - {ex}") - # Data objects allow us to specify folder names - connection = datasource.connection - sharepoint_folders = connection.get("dataobjects") - if sharepoint_folders is None: - _LOG.warning("Sharepoint folders are specified, so I can't do much") + def _sync_sharepoint_documents(self, sp_context, metadata): + try: - sp_folders = sharepoint_folders.split(",") + if sp_context is None: + raise Exception( + "Sharepoint context is null, cannot proceed with document processing." + ) - root_folder = sp_context.web.default_document_library().root_folder + # Data objects allow us to specify folder names + connection = self._datasource.connection + sharepoint_folders = connection.get("dataobjects") + if sharepoint_folders is None: + _LOG.warning("Sharepoint folders are specified, so I can't do much") - for folder_name in sp_folders: - sharepoint_folder = root_folder.folders.get_by_path(folder_name) + sp_folders = sharepoint_folders.split(",") - if sharepoint_folder is None: - _LOG.warning( - f"Cannot retrieve Sharepoint folder {sharepoint_folder} proceeding to process other folders" - ) - continue + root_folder = sp_context.web.default_document_library().root_folder - _process_folder_files( - sharepoint_folder, - datasource=datasource, - rag_endpoint=rag_endpoint, - http_client=http_client, - metadata=metadata, - cache_client=cache_client, - ) + for folder_name in sp_folders: + sharepoint_folder = root_folder.folders.get_by_path(folder_name) + + if sharepoint_folder is None: + _LOG.warning( + f"Cannot retrieve Sharepoint folder {sharepoint_folder} proceeding to process other folders" + ) + continue - # Recursively get all the child folders - _child_folders_recursive = sharepoint_folder.get_folders( - True - ).execute_query() - for _child_folder in _child_folders_recursive: - _process_folder_files( - _child_folder, - connection=connection, - rag_endpoint=rag_endpoint, - http_client=http_client, + self._process_folder_files( + sharepoint_folder, metadata=metadata, - cache_client=cache_client, ) - _LOG.info(f"Completed syncing to endpoint {rag_endpoint}") - - except Exception as file_ex: - _LOG.exception( - f"Error fetching and updating documents - Error: {file_ex}", exc_info=True - ) - - -def _process_file( - file, datasource: Datasource, rag_endpoint, http_client, metadata, cache_client -): - connection = datasource.connection - site_url = connection.get("endpoint") - parsed_site_url = urlparse(site_url) - site_root_url = "{uri.scheme}://{uri.netloc}".format(uri=parsed_site_url) - self_link = f"{site_root_url}{file.serverRelativeUrl}" - _metadata = { - **(metadata or {}), - "file_name": file.name, - "self_link": self_link, - } - """ - We use memcache's add functionality to implement a shared lock to allow for multiple instances - operating - """ - _lock_key = clean_control(f"{__name__}/locks/{self_link}") - if cache_client.add(key=_lock_key, val=_lock_key, time=30 * 60): - try: - _sync_document( - datasource=datasource, - rag_endpoint=rag_endpoint, - http_client=http_client, - metadata=_metadata, - file=file, + # Recursively get all the child folders + _child_folders_recursive = sharepoint_folder.get_folders( + True + ).execute_query() + for _child_folder in _child_folders_recursive: + self._process_folder_files( + _child_folder, + metadata=metadata, + ) + + except Exception as file_ex: + _LOG.exception( + f"Error fetching and updating documents - Error: {file_ex}", + exc_info=True, ) - finally: - cache_client.delete(_lock_key) - else: - _LOG.info(f"Document {self_link} is already processing") - - -def _process_folder_files( - folder, datasource, rag_endpoint, http_client, metadata, cache_client -): - # process files in folder - _files = folder.get_files(False).execute_query() - for file in _files: - _process_file( - file=file, - datasource=datasource, - rag_endpoint=rag_endpoint, - http_client=http_client, - metadata=metadata, - cache_client=cache_client, - ) - - -def _sync_document( - datasource: Datasource, - rag_endpoint: str, - http_client: http.HttpClient, - metadata: dict, - file, -): - connection = datasource.connection - site_url = connection["endpoint"] - _metadata = metadata - - with tempfile.NamedTemporaryFile( - dir=tempfile.gettempdir(), - ) as tmp: - key_parts = file.serverRelativeUrl.split("/") - - path_to_tmp = f"{str(pathlib.Path(tmp.name).absolute())}-{key_parts[-1]}" - try: - _LOG.info( - f"Starting syncing file {file.name} from {file.serverRelativeUrl}" - ) - # Write item to file - downloaded_file_name = path_to_tmp # os.path.join(path_to_tmp, file.name) - # How can we refine this for efficiency - with open(downloaded_file_name, "wb") as local_file: - file.download(local_file).execute_query() - - document: Document = get_document(document_id=file.unique_id) - if document and document.base_uri == site_url: - store_metadata = document.store_metadata - if store_metadata and store_metadata.get("last_modified"): - last_modified = store_metadata["last_modified"] - if ( - not strptime(date_string=last_modified).replace(tzinfo=None) - < file.time_last_modified - ): - _LOG.debug( - f"Skipping sharepoint document {file.name} already up to date" - ) - return - rag_metadata: dict = document.rag_metadata - if rag_metadata is None: - return - for document_data in rag_metadata.get("data") or []: - try: - un_ingest_file( - http_client=http_client, - endpoint=rag_endpoint, - doc_id=document_data["doc_id"], - ) - except: - _LOG.warning( - f"Failed to delete document {document_data['doc_id']}" - ) - try: - delete_document(document_id=document.id) - except: - _LOG.warning( - f"Failed to delete document {file.name}'s record. Continuing anyway..." - ) + def _process_file( + self, + file, + metadata, + ): + connection = self._datasource.connection + site_url = connection.get("endpoint") + parsed_site_url = urlparse(site_url) + site_root_url = "{uri.scheme}://{uri.netloc}".format(uri=parsed_site_url) + self_link = f"{site_root_url}{file.serverRelativeUrl}" + _metadata = { + **(metadata or {}), + "file_name": file.name, + "self_link": self_link, + } + + """ + We use memcache's add functionality to implement a shared lock to allow for multiple instances + operating + """ + _lock_key = clean_control(f"{__name__}/locks/{self_link}") + if self._cache_client.add(key=_lock_key, val=_lock_key, time=30 * 60): try: - response = ingest_file( - http_client=http_client, - endpoint=rag_endpoint, + self._sync_document( metadata=_metadata, - file_path=downloaded_file_name, + file=file, ) - response_json = json.loads(response) + finally: + self._cache_client.delete(_lock_key) + else: + _LOG.info(f"Document {self_link} is already processing") + + def _process_folder_files(self, folder, metadata): + # process files in folder + _files = folder.get_files(False).execute_query() + for file in _files: + self._process_file( + file=file, + metadata=metadata, + ) - except ValueError: - _LOG.warning( - f"File {downloaded_file_name} ingestion failed", exc_info=True + def _sync_document( + self, + metadata: dict, + file, + ): + connection = self._datasource.connection + site_url = connection["endpoint"] + _metadata = metadata + + with tempfile.NamedTemporaryFile( + dir=tempfile.gettempdir(), + ) as tmp: + key_parts = file.serverRelativeUrl.split("/") + + path_to_tmp = f"{str(pathlib.Path(tmp.name).absolute())}-{key_parts[-1]}" + + try: + _LOG.info( + f"Starting syncing file {file.name} from {file.serverRelativeUrl}" ) - response_json = {} - except UserWarning: - _LOG.debug(f"File {downloaded_file_name} is already processing") - return - - save_document( - document_id=file.unique_id, - filename=file.serverRelativeUrl, - base_uri=site_url, - rag_metadata=response_json, - datasource_id=datasource.uuid, - store_metadata={ - "file_name": file.name, - "file_url": file.serverRelativeUrl, - "etag": file.unique_id, - "size": file.length, - "author": file.author, - "last_modified": file.time_last_modified.strftime( - DEFAULT_DATETIME_FORMAT - ), - }, - last_modified=file.time_last_modified, - ) - _LOG.info(f"Done syncing object {file.name} in at {file.serverRelativeUrl}") - except Exception as ex: - _LOG.warning( - f"Error when getting and ingesting file {file.name}", exc_info=True - ) + # Write item to file + # How can we refine this for efficiency + with open(path_to_tmp, "wb") as local_file: + file.download(local_file).execute_query() -def _unsync_sharepoint_documents(sp_context, http_client, rag_endpoint, connection): + self.sync( + site_url, + path_to_tmp, + last_modified=file.time_last_modified, + metadata=metadata, + store_metadata={ + "filename": file.name, + "file_url": file.serverRelativeUrl, + "etag": file.unique_id, + "size": file.length, + "author": file.author, + "last_modified": file.time_last_modified.strftime( + DEFAULT_DATETIME_FORMAT + ), + }, + ) + except: + _LOG.warning( + f"Error when getting and ingesting file {file.name}", exc_info=True + ) - try: - site_url = connection.get("endpoint") + def _unsync_sharepoint_documents(self, sp_context): - if sp_context is None: - raise Exception( - "Sharepoint context is null, cannot proceed with document processing." - ) + def clean(**kwargs): + store_metadata = kwargs["store_metadata"] - documents = get_documents(base_uri=site_url) - for document in documents: - store_metadata = document.store_metadata - rag_metadata = document.rag_metadata file_url = store_metadata["file_url"] try: # Check that the file still exists on the sharepoint server @@ -308,27 +226,14 @@ def _unsync_sharepoint_documents(sp_context, http_client, rag_endpoint, connecti ).get().execute_query() except ClientRequestException as e: if e.response.status_code == 404: - # File no longer exists on sharepoint server so we need to delete from model - try: - http_client.deletes( - urls=[ - f"{rag_endpoint}/v1/ingest/documents/{document_data['doc_id']}" - for document_data in rag_metadata.get("data") or [] - ] - ) - _LOG.info(f"Deleting document {document.filename}") - delete_document(document_id=document.id) - except: - _LOG.warning( - f"Failed to delete document {document.filename}", - exc_info=True, - ) - except Exception as ex: - _LOG.warning( - f"Failed to retrieve file {file_url} from sharepoint - {ex}" - ) - except: - _LOG.warning("Error fetching and updating documents", exc_info=True) + return True + else: + raise + + try: + self.unsync(clean=clean) + except: + _LOG.warning("Error fetching and updating documents", exc_info=True) def validate_connection_info(connection: Dict[str, Any]) -> Dict[str, Any]: diff --git a/nesis/api/core/tasks/document_management.py b/nesis/api/core/tasks/document_management.py index 552688d..c6e06bd 100644 --- a/nesis/api/core/tasks/document_management.py +++ b/nesis/api/core/tasks/document_management.py @@ -51,13 +51,15 @@ def ingest_datasource(**kwargs) -> None: minio_ingestor.run(metadata=metadata) case DatasourceType.SHAREPOINT: - sharepoint.fetch_documents( - datasource=datasource, - rag_endpoint=rag_endpoint, + + ingestor = sharepoint.Processor( + config=config, http_client=http_client, cache_client=cache_client, - metadata={"datasource": datasource.name}, + datasource=datasource, ) + + ingestor.run(metadata=metadata) case DatasourceType.WINDOWS_SHARE: ingestor = samba.Processor( diff --git a/nesis/api/tests/core/document_loaders/test_sharepoint.py b/nesis/api/tests/core/document_loaders/test_sharepoint.py index 5ad3d91..fcc48ec 100644 --- a/nesis/api/tests/core/document_loaders/test_sharepoint.py +++ b/nesis/api/tests/core/document_loaders/test_sharepoint.py @@ -22,10 +22,11 @@ DatasourceType, DatasourceStatus, ) +from nesis.api.core.util.dateutil import strptime @mock.patch("nesis.api.core.document_loaders.sharepoint.ClientContext") -def test_sync_sharepoint_documents( +def test_ingest( client_context: mock.MagicMock, cache: mock.MagicMock, session: Session ) -> None: data = { @@ -81,12 +82,14 @@ def test_sync_sharepoint_documents( http_client = mock.MagicMock() http_client.upload.return_value = json.dumps({}) - sharepoint.fetch_documents( - datasource=datasource, + ingestor = sharepoint.Processor( + config=tests.config, http_client=http_client, - metadata={"datasource": "documents"}, - rag_endpoint="http://localhost:8080", cache_client=cache, + datasource=datasource, + ) + ingestor.run( + metadata={"datasource": "documents"}, ) _, upload_kwargs = http_client.upload.call_args_list[0] @@ -109,7 +112,7 @@ def test_sync_sharepoint_documents( @mock.patch("nesis.api.core.document_loaders.sharepoint.ClientContext") -def test_sync_updated_sharepoint_documents( +def test_updated( sharepoint_context: mock.MagicMock, cache: mock.MagicMock, session: Session ) -> None: """ @@ -139,10 +142,13 @@ def test_sync_updated_sharepoint_documents( session.add(datasource) session.commit() + self_link = "https://ametnes.sharepoint.com/sites/nesit-test/Shared Documents/sharepoint_file.pdf" document = Document( datasource_id=datasource.uuid, base_uri="https://ametnes.sharepoint.com/sites/nesis-test/", - document_id="edu323-23423-23frs-234232", + document_id=str( + uuid.uuid5(uuid.NAMESPACE_DNS, f"{datasource.uuid}:{self_link}") + ), filename="sharepoint_file.pdf", rag_metadata={"data": [{"doc_id": str(uuid.uuid4())}]}, store_metadata={ @@ -153,7 +159,7 @@ def test_sync_updated_sharepoint_documents( "author": "author_name", "last_modified": "2024-01-10 06:40:07", }, - last_modified=datetime.datetime.utcnow(), + last_modified=strptime("2024-01-10 06:40:07"), ) session.add(document) @@ -180,7 +186,7 @@ def test_sync_updated_sharepoint_documents( ) type(file_mock).time_last_modified = mock.PropertyMock( return_value=datetime.datetime.strptime( - "2024-04-10 06:40:07", "%Y-%m-%d %H:%M:%S" + "2024-04-11 06:40:07", "%Y-%m-%d %H:%M:%S" ) ) type(file_mock).length = mock.PropertyMock(return_value=2023) @@ -195,22 +201,23 @@ def test_sync_updated_sharepoint_documents( http_client = mock.MagicMock() http_client.upload.return_value = json.dumps({}) - sharepoint.fetch_documents( - datasource=datasource, + ingestor = sharepoint.Processor( + config=tests.config, http_client=http_client, - metadata={"datasource": "documents"}, - rag_endpoint="http://localhost:8080", cache_client=cache, + datasource=datasource, + ) + ingestor.run( + metadata={"datasource": "documents"}, ) # The document would be deleted from the rag engine - _, upload_kwargs = http_client.delete.call_args_list[0] - url = upload_kwargs["url"] + _, upload_kwargs = http_client.deletes.call_args_list[0] + urls = upload_kwargs["urls"] - assert ( - url - == f"http://localhost:8080/v1/ingest/documents/{document.rag_metadata['data'][0]['doc_id']}" - ) + assert urls == [ + f"http://localhost:8080/v1/ingest/documents/{document.rag_metadata['data'][0]['doc_id']}" + ] # And then re-ingested _, upload_kwargs = http_client.upload.call_args_list[0] @@ -234,11 +241,11 @@ def test_sync_updated_sharepoint_documents( # The document has now been updated documents = session.query(Document).all() assert len(documents) == 1 - assert documents[0].store_metadata["last_modified"] == "2024-04-10 06:40:07" + assert documents[0].store_metadata["last_modified"] == "2024-04-11 06:40:07" @mock.patch("nesis.api.core.document_loaders.sharepoint.ClientContext") -def test_unsync_sharepoint_documents( +def test_uningest( sharepoint_context: mock.MagicMock, cache: mock.MagicMock, session: Session ) -> None: """ @@ -318,12 +325,14 @@ def test_unsync_sharepoint_documents( documents = session.query(Document).all() assert len(documents) == 1 - sharepoint.fetch_documents( - datasource=datasource, + ingestor = sharepoint.Processor( + config=tests.config, http_client=http_client, - metadata={"datasource": "documents"}, - rag_endpoint="http://localhost:8080", cache_client=cache, + datasource=datasource, + ) + ingestor.run( + metadata={"datasource": "documents"}, ) _, upload_kwargs = http_client.deletes.call_args_list[0]