From 9ef93ecd7c6e19a88acf833aa3429f432e8a7411 Mon Sep 17 00:00:00 2001 From: Rahul Triptahi Date: Tue, 25 Jun 2024 03:00:38 +0530 Subject: [PATCH] community[minor]: Added classification_location parameter in PebbloSafeLoader. (#22565) Description: Add classifier_location feature flag. This flag enables Pebblo to decide the classifier location, local or pebblo-cloud. Unit Tests: N/A Documentation: N/A --------- Signed-off-by: Rahul Tripathi Co-authored-by: Rahul Tripathi --- .../chains/pebblo_retrieval/base.py | 182 ++++++++++++------ .../chains/pebblo_retrieval/models.py | 9 +- .../document_loaders/pebblo.py | 142 ++++++++------ .../langchain_community/utilities/pebblo.py | 79 ++++---- 4 files changed, 243 insertions(+), 169 deletions(-) diff --git a/libs/community/langchain_community/chains/pebblo_retrieval/base.py b/libs/community/langchain_community/chains/pebblo_retrieval/base.py index 6097c5b29cb04..4fb769231484a 100644 --- a/libs/community/langchain_community/chains/pebblo_retrieval/base.py +++ b/libs/community/langchain_community/chains/pebblo_retrieval/base.py @@ -5,6 +5,7 @@ import datetime import inspect +import json import logging from http import HTTPStatus from typing import Any, Dict, List, Optional @@ -72,7 +73,9 @@ class PebbloRetrievalQA(Chain): """Pebblo cloud API key for app.""" classifier_url: str = CLASSIFIER_URL #: :meta private: """Classifier endpoint.""" - _discover_sent: bool = False #: :meta private: + classifier_location: str = "local" #: :meta private: + """Classifier location. It could be either of 'local' or 'pebblo-cloud'.""" + _discover_sent = False #: :meta private: """Flag to check if discover payload has been sent.""" _prompt_sent: bool = False #: :meta private: """Flag to check if prompt payload has been sent.""" @@ -94,6 +97,7 @@ def _call( answer, docs = res['result'], res['source_documents'] """ prompt_time = datetime.datetime.now().isoformat() + PebbloRetrievalQA.set_prompt_sent(value=False) _run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager() question = inputs[self.input_key] auth_context = inputs.get(self.auth_context_key, {}) @@ -115,7 +119,9 @@ def _call( "name": self.app_name, "context": [ { - "retrieved_from": doc.metadata.get("source"), + "retrieved_from": doc.metadata.get( + "full_path", doc.metadata.get("source") + ), "doc": doc.page_content, "vector_db": self.retriever.vectorstore.__class__.__name__, } @@ -131,6 +137,7 @@ def _call( "user_identities": auth_context.user_auth if auth_context and hasattr(auth_context, "user_auth") else [], + "classifier_location": self.classifier_location, } qa_payload = Qa(**qa) self._send_prompt(qa_payload) @@ -220,6 +227,7 @@ def from_chain_type( chain_type_kwargs: Optional[dict] = None, api_key: Optional[str] = None, classifier_url: str = CLASSIFIER_URL, + classifier_location: str = "local", **kwargs: Any, ) -> "PebbloRetrievalQA": """Load chain from chain type.""" @@ -231,7 +239,7 @@ def from_chain_type( ) # generate app - app = PebbloRetrievalQA._get_app_details( + app: App = PebbloRetrievalQA._get_app_details( app_name=app_name, description=description, owner=owner, @@ -240,7 +248,10 @@ def from_chain_type( ) PebbloRetrievalQA._send_discover( - app, api_key=api_key, classifier_url=classifier_url + app, + api_key=api_key, + classifier_url=classifier_url, + classifier_location=classifier_location, ) return cls( @@ -250,6 +261,7 @@ def from_chain_type( description=description, api_key=api_key, classifier_url=classifier_url, + classifier_location=classifier_location, **kwargs, ) @@ -300,7 +312,9 @@ async def _aget_docs( ) @staticmethod - def _get_app_details(app_name, owner, description, llm, **kwargs) -> App: # type: ignore + def _get_app_details( # type: ignore + app_name: str, owner: str, description: str, llm: BaseLanguageModel, **kwargs + ) -> App: """Fetch app details. Internal method. Returns: App: App details. @@ -319,38 +333,49 @@ def _get_app_details(app_name, owner, description, llm, **kwargs) -> App: # typ return app @staticmethod - def _send_discover(app, api_key, classifier_url) -> None: # type: ignore + def _send_discover( + app: App, + api_key: Optional[str], + classifier_url: str, + classifier_location: str, + ) -> None: # type: ignore """Send app discovery payload to pebblo-server. Internal method.""" headers = { "Accept": "application/json", "Content-Type": "application/json", } payload = app.dict(exclude_unset=True) - app_discover_url = f"{classifier_url}{APP_DISCOVER_URL}" - try: - pebblo_resp = requests.post( - app_discover_url, headers=headers, json=payload, timeout=20 - ) - logger.debug("discover-payload: %s", payload) - logger.debug( - "send_discover[local]: request url %s, body %s len %s\ - response status %s body %s", - pebblo_resp.request.url, - str(pebblo_resp.request.body), - str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])), - str(pebblo_resp.status_code), - pebblo_resp.json(), - ) - if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: - PebbloRetrievalQA.set_discover_sent() - else: - logger.warning( - f"Received unexpected HTTP response code: {pebblo_resp.status_code}" + if classifier_location == "local": + app_discover_url = f"{classifier_url}{APP_DISCOVER_URL}" + try: + pebblo_resp = requests.post( + app_discover_url, headers=headers, json=payload, timeout=20 ) - except requests.exceptions.RequestException: - logger.warning("Unable to reach pebblo server.") - except Exception as e: - logger.warning("An Exception caught in _send_discover: local %s", e) + logger.debug("discover-payload: %s", payload) + logger.debug( + "send_discover[local]: request url %s, body %s len %s\ + response status %s body %s", + pebblo_resp.request.url, + str(pebblo_resp.request.body), + str( + len( + pebblo_resp.request.body if pebblo_resp.request.body else [] + ) + ), + str(pebblo_resp.status_code), + pebblo_resp.json(), + ) + if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: + PebbloRetrievalQA.set_discover_sent() + else: + logger.warning( + "Received unexpected HTTP response code:" + + f"{pebblo_resp.status_code}" + ) + except requests.exceptions.RequestException: + logger.warning("Unable to reach pebblo server.") + except Exception as e: + logger.warning("An Exception caught in _send_discover: local %s", e) if api_key: try: @@ -385,8 +410,8 @@ def set_discover_sent(cls) -> None: cls._discover_sent = True @classmethod - def set_prompt_sent(cls) -> None: - cls._prompt_sent = True + def set_prompt_sent(cls, value: bool = True) -> None: + cls._prompt_sent = value def _send_prompt(self, qa_payload: Qa) -> None: headers = { @@ -394,39 +419,73 @@ def _send_prompt(self, qa_payload: Qa) -> None: "Content-Type": "application/json", } app_discover_url = f"{self.classifier_url}{PROMPT_URL}" - try: - pebblo_resp = requests.post( - app_discover_url, headers=headers, json=qa_payload.dict(), timeout=20 - ) - logger.debug("prompt-payload: %s", qa_payload) - logger.debug( - "send_prompt[local]: request url %s, body %s len %s\ - response status %s body %s", - pebblo_resp.request.url, - str(pebblo_resp.request.body), - str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])), - str(pebblo_resp.status_code), - pebblo_resp.json(), - ) - if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: - PebbloRetrievalQA.set_prompt_sent() - else: - logger.warning( - f"Received unexpected HTTP response code: {pebblo_resp.status_code}" + pebblo_resp = None + payload = qa_payload.dict(exclude_unset=True) + if self.classifier_location == "local": + try: + pebblo_resp = requests.post( + app_discover_url, + headers=headers, + json=payload, + timeout=20, ) - except requests.exceptions.RequestException: - logger.warning("Unable to reach pebblo server.") - except Exception as e: - logger.warning("An Exception caught in _send_discover: local %s", e) + logger.debug("prompt-payload: %s", payload) + logger.debug( + "send_prompt[local]: request url %s, body %s len %s\ + response status %s body %s", + pebblo_resp.request.url, + str(pebblo_resp.request.body), + str( + len( + pebblo_resp.request.body if pebblo_resp.request.body else [] + ) + ), + str(pebblo_resp.status_code), + pebblo_resp.json(), + ) + if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: + PebbloRetrievalQA.set_prompt_sent() + else: + logger.warning( + "Received unexpected HTTP response code:" + + f"{pebblo_resp.status_code}" + ) + except requests.exceptions.RequestException: + logger.warning("Unable to reach pebblo server.") + except Exception as e: + logger.warning("An Exception caught in _send_discover: local %s", e) + # If classifier location is local, then response, context and prompt + # should be fetched from pebblo_resp and replaced in payload. if self.api_key: + if self.classifier_location == "local": + if pebblo_resp: + payload["response"] = ( + json.loads(pebblo_resp.text) + .get("retrieval_data", {}) + .get("response", {}) + ) + payload["context"] = ( + json.loads(pebblo_resp.text) + .get("retrieval_data", {}) + .get("context", []) + ) + payload["prompt"] = ( + json.loads(pebblo_resp.text) + .get("retrieval_data", {}) + .get("prompt", {}) + ) + else: + payload["response"] = None + payload["context"] = None + payload["prompt"] = None + headers.update({"x-api-key": self.api_key}) + pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{PROMPT_URL}" try: - headers.update({"x-api-key": self.api_key}) - pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{PROMPT_URL}" pebblo_cloud_response = requests.post( pebblo_cloud_url, headers=headers, - json=qa_payload.dict(), + json=payload, timeout=20, ) @@ -449,9 +508,12 @@ def _send_prompt(self, qa_payload: Qa) -> None: logger.warning("Unable to reach Pebblo cloud server.") except Exception as e: logger.warning("An Exception caught in _send_prompt: cloud %s", e) + elif self.classifier_location == "pebblo-cloud": + logger.warning("API key is missing for sending prompt to Pebblo cloud.") + raise NameError("API key is missing for sending prompt to Pebblo cloud.") @classmethod - def get_chain_details(cls, llm, **kwargs): # type: ignore + def get_chain_details(cls, llm: BaseLanguageModel, **kwargs): # type: ignore llm_dict = llm.__dict__ chain = [ { @@ -474,6 +536,6 @@ def get_chain_details(cls, llm, **kwargs): # type: ignore ), } ], - } + }, ] return chain diff --git a/libs/community/langchain_community/chains/pebblo_retrieval/models.py b/libs/community/langchain_community/chains/pebblo_retrieval/models.py index 3dc344dd38c72..3b7f94d44c8a4 100644 --- a/libs/community/langchain_community/chains/pebblo_retrieval/models.py +++ b/libs/community/langchain_community/chains/pebblo_retrieval/models.py @@ -1,6 +1,6 @@ """Models for the PebbloRetrievalQA chain.""" -from typing import Any, List, Optional +from typing import Any, List, Optional, Union from langchain_core.pydantic_v1 import BaseModel @@ -137,9 +137,10 @@ class Prompt(BaseModel): class Qa(BaseModel): name: str - context: List[Optional[Context]] - prompt: Prompt - response: Prompt + context: Union[List[Optional[Context]], Optional[Context]] + prompt: Optional[Prompt] + response: Optional[Prompt] prompt_time: str user: str user_identities: Optional[List[str]] + classifier_location: str diff --git a/libs/community/langchain_community/document_loaders/pebblo.py b/libs/community/langchain_community/document_loaders/pebblo.py index ed203b2c88d75..48c8a231fdb61 100644 --- a/libs/community/langchain_community/document_loaders/pebblo.py +++ b/libs/community/langchain_community/document_loaders/pebblo.py @@ -46,6 +46,8 @@ def __init__( api_key: Optional[str] = None, load_semantic: bool = False, classifier_url: Optional[str] = None, + *, + classifier_location: str = "local", ): if not name or not isinstance(name, str): raise NameError("Must specify a valid name.") @@ -65,6 +67,7 @@ def __init__( self.source_path_size = self.get_source_size(self.source_path) self.source_aggregate_size = 0 self.classifier_url = classifier_url or CLASSIFIER_URL + self.classifier_location = classifier_location self.loader_details = { "loader": loader_name, "source_path": self.source_path, @@ -158,6 +161,7 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list: PebbloSafeLoader.set_loader_sent() doc_content = [doc.dict() for doc in loaded_docs] docs = [] + classified_docs = [] for doc in doc_content: doc_metadata = doc.get("metadata", {}) doc_authorized_identities = doc_metadata.get("authorized_identities", []) @@ -204,6 +208,7 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list: "loader_details": self.loader_details, "loading_end": "false", "source_owner": self.source_owner, + "classifier_location": self.classifier_location, } if loading_end is True: payload["loading_end"] = "true" @@ -212,39 +217,46 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list: "source_aggregate_size" ] = self.source_aggregate_size payload = Doc(**payload).dict(exclude_unset=True) - load_doc_url = f"{self.classifier_url}{LOADER_DOC_URL}" - classified_docs = [] - try: - pebblo_resp = requests.post( - load_doc_url, headers=headers, json=payload, timeout=300 - ) - classified_docs = json.loads(pebblo_resp.text).get("docs", None) - if pebblo_resp.status_code not in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: - logger.warning( - "Received unexpected HTTP response code: %s", - pebblo_resp.status_code, + # Raw payload to be sent to classifier + if self.classifier_location == "local": + load_doc_url = f"{self.classifier_url}{LOADER_DOC_URL}" + try: + pebblo_resp = requests.post( + load_doc_url, headers=headers, json=payload, timeout=300 ) - logger.debug( - "send_loader_doc[local]: request url %s, body %s len %s\ - response status %s body %s", - pebblo_resp.request.url, - str(pebblo_resp.request.body), - str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])), - str(pebblo_resp.status_code), - pebblo_resp.json(), - ) - except requests.exceptions.RequestException: - logger.warning("Unable to reach pebblo server.") - except Exception as e: - logger.warning("An Exception caught in _send_loader_doc: local %s", e) + classified_docs = json.loads(pebblo_resp.text).get("docs", None) + if pebblo_resp.status_code not in [ + HTTPStatus.OK, + HTTPStatus.BAD_GATEWAY, + ]: + logger.warning( + "Received unexpected HTTP response code: %s", + pebblo_resp.status_code, + ) + logger.debug( + "send_loader_doc[local]: request url %s, body %s len %s\ + response status %s body %s", + pebblo_resp.request.url, + str(pebblo_resp.request.body), + str( + len( + pebblo_resp.request.body if pebblo_resp.request.body else [] + ) + ), + str(pebblo_resp.status_code), + pebblo_resp.json(), + ) + except requests.exceptions.RequestException: + logger.warning("Unable to reach pebblo server.") + except Exception as e: + logger.warning("An Exception caught in _send_loader_doc: local %s", e) + if self.api_key: - if not classified_docs: - return classified_docs - try: + if self.classifier_location == "local": payload["docs"] = classified_docs - payload["classified"] = True - headers.update({"x-api-key": self.api_key}) - pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}" + headers.update({"x-api-key": self.api_key}) + pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{LOADER_DOC_URL}" + try: pebblo_cloud_response = requests.post( pebblo_cloud_url, headers=headers, json=payload, timeout=20 ) @@ -267,9 +279,10 @@ def _classify_doc(self, loaded_docs: list, loading_end: bool = False) -> list: logger.warning("Unable to reach Pebblo cloud server.") except Exception as e: logger.warning("An Exception caught in _send_loader_doc: cloud %s", e) + elif self.classifier_location == "pebblo-cloud": + logger.warning("API key is missing for sending docs to Pebblo cloud.") + raise NameError("API key is missing for sending docs to Pebblo cloud.") - if loading_end is True: - PebbloSafeLoader.set_loader_sent() return classified_docs @staticmethod @@ -298,45 +311,50 @@ def _send_discover(self) -> None: "Content-Type": "application/json", } payload = self.app.dict(exclude_unset=True) - app_discover_url = f"{self.classifier_url}{APP_DISCOVER_URL}" - try: - pebblo_resp = requests.post( - app_discover_url, headers=headers, json=payload, timeout=20 - ) - logger.debug( - "send_discover[local]: request url %s, body %s len %s\ - response status %s body %s", - pebblo_resp.request.url, - str(pebblo_resp.request.body), - str(len(pebblo_resp.request.body if pebblo_resp.request.body else [])), - str(pebblo_resp.status_code), - pebblo_resp.json(), - ) - if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: - PebbloSafeLoader.set_discover_sent() - else: - logger.warning( - f"Received unexpected HTTP response code: {pebblo_resp.status_code}" + # Raw discover payload to be sent to classifier + if self.classifier_location == "local": + app_discover_url = f"{self.classifier_url}{APP_DISCOVER_URL}" + try: + pebblo_resp = requests.post( + app_discover_url, headers=headers, json=payload, timeout=20 ) - except requests.exceptions.RequestException: - logger.warning("Unable to reach pebblo server.") - except Exception as e: - logger.warning("An Exception caught in _send_discover: local %s", e) + logger.debug( + "send_discover[local]: request url %s, body %s len %s\ + response status %s body %s", + pebblo_resp.request.url, + str(pebblo_resp.request.body), + str( + len( + pebblo_resp.request.body if pebblo_resp.request.body else [] + ) + ), + str(pebblo_resp.status_code), + pebblo_resp.json(), + ) + if pebblo_resp.status_code in [HTTPStatus.OK, HTTPStatus.BAD_GATEWAY]: + PebbloSafeLoader.set_discover_sent() + else: + logger.warning( + f"Received unexpected HTTP response code:\ + {pebblo_resp.status_code}" + ) + except requests.exceptions.RequestException: + logger.warning("Unable to reach pebblo server.") + except Exception as e: + logger.warning("An Exception caught in _send_discover: local %s", e) if self.api_key: try: headers.update({"x-api-key": self.api_key}) + # If the pebblo_resp is None, + # then the pebblo server version is not available if pebblo_resp: pebblo_server_version = json.loads(pebblo_resp.text).get( "pebblo_server_version" ) - payload.update( - { - "pebblo_server_version": pebblo_server_version, - "pebblo_client_version": payload["plugin_version"], - } - ) - payload.pop("plugin_version") + payload.update({"pebblo_server_version": pebblo_server_version}) + + payload.update({"pebblo_client_version": PLUGIN_VERSION}) pebblo_cloud_url = f"{PEBBLO_CLOUD_URL}{APP_DISCOVER_URL}" pebblo_cloud_response = requests.post( pebblo_cloud_url, headers=headers, json=payload, timeout=20 diff --git a/libs/community/langchain_community/utilities/pebblo.py b/libs/community/langchain_community/utilities/pebblo.py index 9ed43630c723b..377155c71f4a3 100644 --- a/libs/community/langchain_community/utilities/pebblo.py +++ b/libs/community/langchain_community/utilities/pebblo.py @@ -63,93 +63,86 @@ class IndexedDocument(Document): + """Pebblo Indexed Document.""" + id: str + """Unique ID of the document.""" class Runtime(BaseModel): - """Pebblo Runtime. - - Args: - type (Optional[str]): Runtime type. Defaults to "" - host (str): Hostname of runtime. - path (str): Current working directory path. - ip (Optional[str]): Ip of current runtime. Defaults to "" - platform (str): Platform details of current runtime. - os (str): OS name. - os_version (str): OS version. - language (str): Runtime kernel. - language_version (str): version of current runtime kernel. - runtime (Optional[str]) More runtime details. Defaults to "" - """ + """Pebblo Runtime.""" type: str = "local" + """Runtime type. Defaults to 'local'.""" host: str + """Host name of the runtime.""" path: str + """Current working directory path.""" ip: Optional[str] = "" + """IP address of the runtime. Defaults to ''.""" platform: str + """Platform details of the runtime.""" os: str + """OS name.""" os_version: str + """OS version.""" language: str + """Runtime kernel.""" language_version: str + """Version of the runtime kernel.""" runtime: str = "local" + """More runtime details. Defaults to 'local'.""" class Framework(BaseModel): - """Pebblo Framework instance. - - Args: - name (str): Name of the Framework. - version (str): Version of the Framework. - """ + """Pebblo Framework instance.""" name: str + """Name of the Framework.""" version: str + """Version of the Framework.""" class App(BaseModel): - """Pebblo AI application. - - Args: - name (str): Name of the app. - owner (str): Owner of the app. - description (Optional[str]): Description of the app. - load_id (str): Unique load_id of the app instance. - runtime (Runtime): Runtime details of app. - framework (Framework): Framework details of the app - plugin_version (str): Plugin version used for the app. - """ + """Pebblo AI application.""" name: str + """Name of the app.""" owner: str + """Owner of the app.""" description: Optional[str] + """Description of the app.""" load_id: str + """Unique load_id of the app instance.""" runtime: Runtime + """Runtime details of the app.""" framework: Framework + """Framework details of the app.""" plugin_version: str + """Plugin version used for the app.""" class Doc(BaseModel): - """Pebblo document. - - Args: - name (str): Name of app originating this document. - owner (str): Owner of app. - docs (list): List of documents with its metadata. - plugin_version (str): Pebblo plugin Version - load_id (str): Unique load_id of the app instance. - loader_details (dict): Loader details with its metadata. - loading_end (bool): Boolean, specifying end of loading of source. - source_owner (str): Owner of the source of the loader. - """ + """Pebblo document.""" name: str + """Name of app originating this document.""" owner: str + """Owner of app.""" docs: list + """List of documents with its metadata.""" plugin_version: str + """Pebblo plugin Version""" load_id: str + """Unique load_id of the app instance.""" loader_details: dict + """Loader details with its metadata.""" loading_end: bool + """Boolean, specifying end of loading of source.""" source_owner: str + """Owner of the source of the loader.""" + classifier_location: str + """Location of the classifier.""" def get_full_path(path: str) -> str: