diff --git a/backend/danswer/connectors/confluence/connector.py b/backend/danswer/connectors/confluence/connector.py index 9f1fe2e05f5..5c58cf194fa 100644 --- a/backend/danswer/connectors/confluence/connector.py +++ b/backend/danswer/connectors/confluence/connector.py @@ -70,7 +70,7 @@ def __init__( ) -> None: self.batch_size = batch_size self.continue_on_failure = continue_on_failure - self.confluence_client: OnyxConfluence | None = None + self._confluence_client: OnyxConfluence | None = None self.is_cloud = is_cloud # Remove trailing slash from wiki_base if present @@ -97,13 +97,21 @@ def __init__( self.cql_label_filter = "" if labels_to_skip: labels_to_skip = list(set(labels_to_skip)) - comma_separated_labels = ",".join(f"'{label}'" for label in labels_to_skip) + comma_separated_labels = ",".join( + f"'{quote(label)}'" for label in labels_to_skip + ) self.cql_label_filter = f" and label not in ({comma_separated_labels})" + @property + def confluence_client(self) -> OnyxConfluence: + if self._confluence_client is None: + raise ConnectorMissingCredentialError("Confluence") + return self._confluence_client + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: # see https://github.com/atlassian-api/atlassian-python-api/blob/master/atlassian/rest_client.py # for a list of other hidden constructor args - self.confluence_client = build_confluence_client( + self._confluence_client = build_confluence_client( credentials_json=credentials, is_cloud=self.is_cloud, wiki_base=self.wiki_base, @@ -111,25 +119,22 @@ def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None return None def _get_comment_string_for_page_id(self, page_id: str) -> str: - if self.confluence_client is None: - raise ConnectorMissingCredentialError("Confluence") - comment_string = "" comment_cql = f"type=comment and container='{page_id}'" comment_cql += self.cql_label_filter expand = ",".join(_COMMENT_EXPANSION_FIELDS) - for comments in self.confluence_client.paginated_cql_page_retrieval( + for comment in self.confluence_client.paginated_cql_retrieval( cql=comment_cql, expand=expand, ): - for comment in comments: - comment_string += "\nComment:\n" - comment_string += extract_text_from_confluence_html( - confluence_client=self.confluence_client, - confluence_object=comment, - ) + comment_string += "\nComment:\n" + comment_string += extract_text_from_confluence_html( + confluence_client=self.confluence_client, + confluence_object=comment, + fetched_titles=set(), + ) return comment_string @@ -141,9 +146,6 @@ def _convert_object_to_document( If its a page, it extracts the text, adds the comments for the document text. If its an attachment, it just downloads the attachment and converts that into a document. """ - if self.confluence_client is None: - raise ConnectorMissingCredentialError("Confluence") - # The url and the id are the same object_url = build_confluence_document_id( self.wiki_base, confluence_object["_links"]["webui"], self.is_cloud @@ -153,16 +155,19 @@ def _convert_object_to_document( # Extract text from page if confluence_object["type"] == "page": object_text = extract_text_from_confluence_html( - self.confluence_client, confluence_object + confluence_client=self.confluence_client, + confluence_object=confluence_object, + fetched_titles={confluence_object.get("title", "")}, ) # Add comments to text object_text += self._get_comment_string_for_page_id(confluence_object["id"]) elif confluence_object["type"] == "attachment": object_text = attachment_to_content( - self.confluence_client, confluence_object + confluence_client=self.confluence_client, attachment=confluence_object ) if object_text is None: + # This only happens for attachments that are not parseable return None # Get space name @@ -193,44 +198,39 @@ def _convert_object_to_document( ) def _fetch_document_batches(self) -> GenerateDocumentsOutput: - if self.confluence_client is None: - raise ConnectorMissingCredentialError("Confluence") - doc_batch: list[Document] = [] confluence_page_ids: list[str] = [] page_query = self.cql_page_query + self.cql_label_filter + self.cql_time_filter # Fetch pages as Documents - for page_batch in self.confluence_client.paginated_cql_page_retrieval( + for page in self.confluence_client.paginated_cql_retrieval( cql=page_query, expand=",".join(_PAGE_EXPANSION_FIELDS), limit=self.batch_size, ): - for page in page_batch: - confluence_page_ids.append(page["id"]) - doc = self._convert_object_to_document(page) - if doc is not None: - doc_batch.append(doc) - if len(doc_batch) >= self.batch_size: - yield doc_batch - doc_batch = [] + confluence_page_ids.append(page["id"]) + doc = self._convert_object_to_document(page) + if doc is not None: + doc_batch.append(doc) + if len(doc_batch) >= self.batch_size: + yield doc_batch + doc_batch = [] # Fetch attachments as Documents for confluence_page_id in confluence_page_ids: attachment_cql = f"type=attachment and container='{confluence_page_id}'" attachment_cql += self.cql_label_filter # TODO: maybe should add time filter as well? - for attachments in self.confluence_client.paginated_cql_page_retrieval( + for attachment in self.confluence_client.paginated_cql_retrieval( cql=attachment_cql, expand=",".join(_ATTACHMENT_EXPANSION_FIELDS), ): - for attachment in attachments: - doc = self._convert_object_to_document(attachment) - if doc is not None: - doc_batch.append(doc) - if len(doc_batch) >= self.batch_size: - yield doc_batch - doc_batch = [] + doc = self._convert_object_to_document(attachment) + if doc is not None: + doc_batch.append(doc) + if len(doc_batch) >= self.batch_size: + yield doc_batch + doc_batch = [] if doc_batch: yield doc_batch @@ -255,52 +255,47 @@ def retrieve_all_slim_documents( start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None, ) -> GenerateSlimDocumentOutput: - if self.confluence_client is None: - raise ConnectorMissingCredentialError("Confluence") - doc_metadata_list: list[SlimDocument] = [] restrictions_expand = ",".join(_RESTRICTIONS_EXPANSION_FIELDS) page_query = self.cql_page_query + self.cql_label_filter - for pages in self.confluence_client.cql_paginate_all_expansions( + for page in self.confluence_client.cql_paginate_all_expansions( cql=page_query, expand=restrictions_expand, ): - for page in pages: - # If the page has restrictions, add them to the perm_sync_data - # These will be used by doc_sync.py to sync permissions - perm_sync_data = { - "restrictions": page.get("restrictions", {}), - "space_key": page.get("space", {}).get("key"), - } - + # If the page has restrictions, add them to the perm_sync_data + # These will be used by doc_sync.py to sync permissions + perm_sync_data = { + "restrictions": page.get("restrictions", {}), + "space_key": page.get("space", {}).get("key"), + } + + doc_metadata_list.append( + SlimDocument( + id=build_confluence_document_id( + self.wiki_base, + page["_links"]["webui"], + self.is_cloud, + ), + perm_sync_data=perm_sync_data, + ) + ) + attachment_cql = f"type=attachment and container='{page['id']}'" + attachment_cql += self.cql_label_filter + for attachment in self.confluence_client.cql_paginate_all_expansions( + cql=attachment_cql, + expand=restrictions_expand, + ): doc_metadata_list.append( SlimDocument( id=build_confluence_document_id( self.wiki_base, - page["_links"]["webui"], + attachment["_links"]["webui"], self.is_cloud, ), perm_sync_data=perm_sync_data, ) ) - attachment_cql = f"type=attachment and container='{page['id']}'" - attachment_cql += self.cql_label_filter - for attachments in self.confluence_client.cql_paginate_all_expansions( - cql=attachment_cql, - expand=restrictions_expand, - ): - for attachment in attachments: - doc_metadata_list.append( - SlimDocument( - id=build_confluence_document_id( - self.wiki_base, - attachment["_links"]["webui"], - self.is_cloud, - ), - perm_sync_data=perm_sync_data, - ) - ) - yield doc_metadata_list - doc_metadata_list = [] + yield doc_metadata_list + doc_metadata_list = [] diff --git a/backend/danswer/connectors/confluence/onyx_confluence.py b/backend/danswer/connectors/confluence/onyx_confluence.py index c01f45dea6a..c6a4d3857b1 100644 --- a/backend/danswer/connectors/confluence/onyx_confluence.py +++ b/backend/danswer/connectors/confluence/onyx_confluence.py @@ -20,6 +20,10 @@ RATE_LIMIT_MESSAGE_LOWERCASE = "Rate limit exceeded".lower() +# https://jira.atlassian.com/browse/CONFCLOUD-76433 +_PROBLEMATIC_EXPANSIONS = "body.storage.value" +_REPLACEMENT_EXPANSIONS = "body.view.value" + class ConfluenceRateLimitError(Exception): pass @@ -141,7 +145,7 @@ def _wrap_methods(self) -> None: def _paginate_url( self, url_suffix: str, limit: int | None = None - ) -> Iterator[list[dict[str, Any]]]: + ) -> Iterator[dict[str, Any]]: """ This will paginate through the top level query. """ @@ -153,46 +157,43 @@ def _paginate_url( while url_suffix: try: + logger.debug(f"Making confluence call to {url_suffix}") next_response = self.get(url_suffix) except Exception as e: - logger.exception("Error in danswer_cql: \n") - raise e - yield next_response.get("results", []) - url_suffix = next_response.get("_links", {}).get("next") + logger.warning(f"Error in confluence call to {url_suffix}") - def paginated_groups_retrieval( - self, - limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: - return self._paginate_url("rest/api/group", limit) + # If the problematic expansion is in the url, replace it + # with the replacement expansion and try again + # If that fails, raise the error + if _PROBLEMATIC_EXPANSIONS not in url_suffix: + logger.exception(f"Error in confluence call to {url_suffix}") + raise e + logger.warning( + f"Replacing {_PROBLEMATIC_EXPANSIONS} with {_REPLACEMENT_EXPANSIONS}" + " and trying again." + ) + url_suffix = url_suffix.replace( + _PROBLEMATIC_EXPANSIONS, + _REPLACEMENT_EXPANSIONS, + ) + continue - def paginated_group_members_retrieval( - self, - group_name: str, - limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: - group_name = quote(group_name) - return self._paginate_url(f"rest/api/group/{group_name}/member", limit) + # yield the results individually + yield from next_response.get("results", []) - def paginated_cql_user_retrieval( - self, - cql: str, - expand: str | None = None, - limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: - expand_string = f"&expand={expand}" if expand else "" - return self._paginate_url( - f"rest/api/search/user?cql={cql}{expand_string}", limit - ) + url_suffix = next_response.get("_links", {}).get("next") - def paginated_cql_page_retrieval( + def paginated_cql_retrieval( self, cql: str, expand: str | None = None, limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: + ) -> Iterator[dict[str, Any]]: + """ + The content/search endpoint can be used to fetch pages, attachments, and comments. + """ expand_string = f"&expand={expand}" if expand else "" - return self._paginate_url( + yield from self._paginate_url( f"rest/api/content/search?cql={cql}{expand_string}", limit ) @@ -201,7 +202,7 @@ def cql_paginate_all_expansions( cql: str, expand: str | None = None, limit: int | None = None, - ) -> Iterator[list[dict[str, Any]]]: + ) -> Iterator[dict[str, Any]]: """ This function will paginate through the top level query first, then paginate through all of the expansions. @@ -221,6 +222,44 @@ def _traverse_and_update(data: dict | list) -> None: for item in data: _traverse_and_update(item) - for results in self.paginated_cql_page_retrieval(cql, expand, limit): - _traverse_and_update(results) - yield results + for confluence_object in self.paginated_cql_retrieval(cql, expand, limit): + _traverse_and_update(confluence_object) + yield confluence_object + + def paginated_cql_user_retrieval( + self, + cql: str, + expand: str | None = None, + limit: int | None = None, + ) -> Iterator[dict[str, Any]]: + """ + The search/user endpoint can be used to fetch users. + It's a seperate endpoint from the content/search endpoint used only for users. + Otherwise it's very similar to the content/search endpoint. + """ + expand_string = f"&expand={expand}" if expand else "" + yield from self._paginate_url( + f"rest/api/search/user?cql={cql}{expand_string}", limit + ) + + def paginated_groups_retrieval( + self, + limit: int | None = None, + ) -> Iterator[dict[str, Any]]: + """ + This is not an SQL like query. + It's a confluence specific endpoint that can be used to fetch groups. + """ + yield from self._paginate_url("rest/api/group", limit) + + def paginated_group_members_retrieval( + self, + group_name: str, + limit: int | None = None, + ) -> Iterator[dict[str, Any]]: + """ + This is not an SQL like query. + It's a confluence specific endpoint that can be used to fetch the members of a group. + """ + group_name = quote(group_name) + yield from self._paginate_url(f"rest/api/group/{group_name}/member", limit) diff --git a/backend/danswer/connectors/confluence/utils.py b/backend/danswer/connectors/confluence/utils.py index 5c27308a94e..9deff36b269 100644 --- a/backend/danswer/connectors/confluence/utils.py +++ b/backend/danswer/connectors/confluence/utils.py @@ -2,6 +2,7 @@ from datetime import datetime from datetime import timezone from typing import Any +from urllib.parse import quote import bs4 @@ -71,7 +72,9 @@ def _get_user(confluence_client: OnyxConfluence, user_id: str) -> str: def extract_text_from_confluence_html( - confluence_client: OnyxConfluence, confluence_object: dict[str, Any] + confluence_client: OnyxConfluence, + confluence_object: dict[str, Any], + fetched_titles: set[str], ) -> str: """Parse a Confluence html page and replace the 'user Id' by the real User Display Name @@ -79,7 +82,7 @@ def extract_text_from_confluence_html( Args: confluence_object (dict): The confluence object as a dict confluence_client (Confluence): Confluence client - + fetched_titles (set[str]): The titles of the pages that have already been fetched Returns: str: loaded and formated Confluence page """ @@ -101,38 +104,72 @@ def extract_text_from_confluence_html( # Include @ sign for tagging, more clear for LLM user.replaceWith("@" + _get_user(confluence_client, user_id)) - for html_page_reference in soup.findAll("ri:page"): + for html_page_reference in soup.findAll("ac:structured-macro"): + # Here, we only want to process page within page macros + if html_page_reference.attrs.get("ac:name") != "include": + continue + + page_data = html_page_reference.find("ri:page") + if not page_data: + logger.warning( + f"Skipping retrieval of {html_page_reference} because because page data is missing" + ) + continue + + page_title = page_data.attrs.get("ri:content-title") + if not page_title: + # only fetch pages that have a title + logger.warning( + f"Skipping retrieval of {html_page_reference} because it has no title" + ) + continue + + if page_title in fetched_titles: + # prevent recursive fetching of pages + logger.debug(f"Skipping {page_title} because it has already been fetched") + continue + + fetched_titles.add(page_title) + # Wrap this in a try-except because there are some pages that might not exist try: - page_title = html_page_reference.attrs["ri:content-title"] - if not page_title: - continue - - page_query = f"type=page and title='{page_title}'" + page_query = f"type=page and title='{quote(page_title)}'" page_contents: dict[str, Any] | None = None # Confluence enforces title uniqueness, so we should only get one result here - for page_batch in confluence_client.paginated_cql_page_retrieval( + for page in confluence_client.paginated_cql_retrieval( cql=page_query, expand="body.storage.value", limit=1, ): - page_contents = page_batch[0] + page_contents = page break - except Exception: + except Exception as e: logger.warning( - f"Error getting page contents for object {confluence_object}" + f"Error getting page contents for object {confluence_object}: {e}" ) continue if not page_contents: continue + text_from_page = extract_text_from_confluence_html( - confluence_client, page_contents + confluence_client=confluence_client, + confluence_object=page_contents, + fetched_titles=fetched_titles, ) html_page_reference.replaceWith(text_from_page) + for html_link_body in soup.findAll("ac:link-body"): + # This extracts the text from inline links in the page so they can be + # represented in the document text as plain text + try: + text_from_link = html_link_body.text + html_link_body.replaceWith(f"(LINK TEXT: {text_from_link})") + except Exception as e: + logger.warning(f"Error processing ac:link-body: {e}") + return format_document_soup(soup) diff --git a/backend/ee/danswer/external_permissions/confluence/group_sync.py b/backend/ee/danswer/external_permissions/confluence/group_sync.py index db5bd9c01cc..fab2bde4ea0 100644 --- a/backend/ee/danswer/external_permissions/confluence/group_sync.py +++ b/backend/ee/danswer/external_permissions/confluence/group_sync.py @@ -1,5 +1,3 @@ -from typing import Any - from danswer.connectors.confluence.onyx_confluence import OnyxConfluence from danswer.connectors.confluence.utils import build_confluence_client from danswer.connectors.confluence.utils import get_user_email_from_username__server @@ -15,12 +13,8 @@ def _get_group_members_email_paginated( confluence_client: OnyxConfluence, group_name: str, ) -> set[str]: - members: list[dict[str, Any]] = [] - for member_batch in confluence_client.paginated_group_members_retrieval(group_name): - members.extend(member_batch) - group_member_emails: set[str] = set() - for member in members: + for member in confluence_client.paginated_group_members_retrieval(group_name): email = member.get("email") if not email: user_name = member.get("username") @@ -47,10 +41,9 @@ def confluence_group_sync( # Get all group names group_names: list[str] = [] - for group_batch in confluence_client.paginated_groups_retrieval(): - for group in group_batch: - if group_name := group.get("name"): - group_names.append(group_name) + for group in confluence_client.paginated_groups_retrieval(): + if group_name := group.get("name"): + group_names.append(group_name) # For each group name, get all members and create a danswer group danswer_groups: list[ExternalUserGroup] = []