diff --git a/haystack/nodes/retriever/_embedding_encoder.py b/haystack/nodes/retriever/_embedding_encoder.py index d07e7ab1b8..f75b997123 100644 --- a/haystack/nodes/retriever/_embedding_encoder.py +++ b/haystack/nodes/retriever/_embedding_encoder.py @@ -16,7 +16,7 @@ HAYSTACK_REMOTE_API_MAX_RETRIES, HAYSTACK_REMOTE_API_TIMEOUT_SEC, ) -from haystack.errors import CohereError, CohereUnauthorizedError +from haystack.errors import AWSConfigurationError, CohereError, CohereUnauthorizedError from haystack.nodes.retriever._openai_encoder import _OpenAIEmbeddingEncoder from haystack.schema import Document from haystack.telemetry import send_event @@ -42,6 +42,9 @@ from haystack.modeling.infer import Inferencer from haystack.nodes.retriever._losses import _TRAINING_LOSSES +with LazyImport(message="Run 'pip install boto3'") as boto3_import: + import boto3 + from botocore.exceptions import BotoCoreError COHERE_TIMEOUT = float(os.environ.get(HAYSTACK_REMOTE_API_TIMEOUT_SEC, 30)) COHERE_BACKOFF = int(os.environ.get(HAYSTACK_REMOTE_API_BACKOFF_SEC, 10)) @@ -55,6 +58,8 @@ "embed-multilingual-v2.0", ] +BEDROCK_EMBEDDING_MODELS = ["amazon.titan-embed-text-v1", "cohere.embed-english-v3", "cohere.embed-multilingual-v3"] + class _DefaultEmbeddingEncoder(_BaseEmbeddingEncoder): def __init__(self, retriever: "EmbeddingRetriever"): @@ -434,6 +439,107 @@ def save(self, save_dir: Union[Path, str]): raise NotImplementedError(f"Saving is not implemented for {self.__class__}") +class _BedrockEmbeddingEncoder(_BaseEmbeddingEncoder): + def __init__(self, retriever: "EmbeddingRetriever"): + """Embedding Encoder for Bedrock models + See https://docs.aws.amazon.com/bedrock/latest/userguide/embeddings.html for more details. + The maximum input text is 8K tokens and the maximum output vector length is 1536. + Titan embeddings do not support batch operations. + + :param retriever: EmbeddingRetriever object + """ + boto3_import.check() + if retriever.embedding_model not in BEDROCK_EMBEDDING_MODELS: + raise ValueError("Model not supported by Bedrock Embedding Encoder") + self.model = retriever.embedding_model + self.client = self._initialize_boto3_session(retriever.aws_config).client("bedrock-runtime") + + def _initialize_boto3_session(self, aws_config: Optional[Dict[str, Any]]): + if aws_config is None: + raise ValueError( + "`aws_config` is not set. To use Bedrock models, you should set `aws_config` when initializing the retriever." + ) + + aws_access_key_id = aws_config.get("aws_access_key_id", None) + aws_secret_access_key = aws_config.get("aws_secret_access_key", None) + aws_session_token = aws_config.get("aws_session_token", None) + region_name = aws_config.get("region_name", None) + profile_name = aws_config.get("profile_name", None) + try: + return boto3.Session( + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_session_token=aws_session_token, + region_name=region_name, + profile_name=profile_name, + ) + except BotoCoreError as e: + raise AWSConfigurationError( + f"Failed to initialize the session with provided AWS credentials {aws_config}" + ) from e + + def _embed_batch_cohere( + self, texts: List[str], input_type: Literal["search_query", "search_document"] + ) -> np.ndarray: + cohere_payload = {"texts": texts, "input_type": input_type, "truncate": "RIGHT"} + response = self._invoke_model(cohere_payload) + embeddings = np.array(response["embeddings"]) + return embeddings + + def _embed_titan(self, text: str) -> np.ndarray: + titan_payload = {"inputText": text} + response = self._invoke_model(titan_payload) + embeddings = np.array(response["embedding"]) + return embeddings + + def _invoke_model(self, payload: Dict[str, Any]) -> Dict[str, Any]: + body = json.dumps(payload) + response = self.client.invoke_model( + body=body, modelId=self.model, accept="application/json", contentType="application/json" + ) + body = response.get("body").read().decode("utf-8") + response_body = json.loads(body) + return response_body + + def embed_queries(self, queries: List[str]) -> np.ndarray: + if self.model == "amazon.titan-embed-text-v1": + all_embeddings = [] + for query in queries: + generated_embeddings = self._embed_titan(query) + all_embeddings.append(generated_embeddings) + return np.stack(all_embeddings) + else: + return self._embed_batch_cohere(queries, input_type="search_query") + + def embed_documents(self, docs: List[Document]) -> np.ndarray: + if self.model == "amazon.titan-embed-text-v1": + all_embeddings = [] + for doc in docs: + generated_embeddings = self._embed_titan(doc.content) + all_embeddings.append(generated_embeddings) + return np.stack(all_embeddings) + else: + contents = [d.content for d in docs] + return self._embed_batch_cohere(contents, input_type="search_document") + + def train( + self, + training_data: List[Dict[str, Any]], + learning_rate: float = 2e-5, + n_epochs: int = 1, + num_warmup_steps: Optional[int] = None, + batch_size: int = 16, + train_loss: Literal["mnrl", "margin_mse"] = "mnrl", + num_workers: int = 0, + use_amp: bool = False, + **kwargs, + ): + raise NotImplementedError(f"Training is not implemented for {self.__class__}") + + def save(self, save_dir: Union[Path, str]): + raise NotImplementedError(f"Saving is not implemented for {self.__class__}") + + _EMBEDDING_ENCODERS: Dict[str, Callable] = { "farm": _DefaultEmbeddingEncoder, "transformers": _DefaultEmbeddingEncoder, @@ -441,4 +547,5 @@ def save(self, save_dir: Union[Path, str]): "retribert": _RetribertEmbeddingEncoder, "openai": _OpenAIEmbeddingEncoder, "cohere": _CohereEmbeddingEncoder, + "bedrock": _BedrockEmbeddingEncoder, } diff --git a/haystack/nodes/retriever/dense.py b/haystack/nodes/retriever/dense.py index d4663e6cfd..3139ec1561 100644 --- a/haystack/nodes/retriever/dense.py +++ b/haystack/nodes/retriever/dense.py @@ -17,7 +17,11 @@ from haystack.schema import Document, FilterType from haystack.document_stores import BaseDocumentStore from haystack.nodes.retriever.base import BaseRetriever -from haystack.nodes.retriever._embedding_encoder import _EMBEDDING_ENCODERS, COHERE_EMBEDDING_MODELS +from haystack.nodes.retriever._embedding_encoder import ( + _EMBEDDING_ENCODERS, + COHERE_EMBEDDING_MODELS, + BEDROCK_EMBEDDING_MODELS, +) from haystack.utils.early_stopping import EarlyStopping from haystack.telemetry import send_event from haystack.lazy_imports import LazyImport @@ -1468,13 +1472,15 @@ def __init__( azure_deployment_name: Optional[str] = None, api_base: str = "https://api.openai.com/v1", openai_organization: Optional[str] = None, + aws_config: Optional[Dict[str, Any]] = None, ): """ :param document_store: An instance of DocumentStore from which to retrieve documents. :param embedding_model: Local path or name of model in Hugging Face's model hub such as ``'sentence-transformers/all-MiniLM-L6-v2'``. The embedding model could also potentially be an OpenAI model ["ada", "babbage", "davinci", "curie"] or - a Cohere model ["embed-english-v2.0", "embed-english-light-v2.0", "embed-multilingual-v2.0"]. + a Cohere model ["embed-english-v2.0", "embed-english-light-v2.0", "embed-multilingual-v2.0"] or + an AWS Bedrock model ["amazon.titan-embed-text-v1", "cohere.embed-english-v3", "cohere.embed-multilingual-v3"]. :param model_version: The version of model to use from the HuggingFace model hub. Can be tag name, branch name, or commit hash. :param use_gpu: Whether to use all available GPUs or the CPU. Falls back on CPU if no GPU is available. :param batch_size: Number of documents to encode at once. @@ -1489,6 +1495,7 @@ def __init__( 4. `retribert` : (will use `_RetribertEmbeddingEncoder` as embedding encoder) 5. `openai` : (will use `_OpenAIEmbeddingEncoder` as embedding encoder) 6. `cohere` : (will use `_CohereEmbeddingEncoder` as embedding encoder) + 7. `bedrock` : (will use `_BedrockEmbeddingEncoder` as embedding encoder) :param pooling_strategy: Strategy for combining the embeddings from the model (for farm / transformers models only). Options: @@ -1533,6 +1540,7 @@ def __init__( :param api_base: The OpenAI API base URL, defaults to `"https://api.openai.com/v1"`. :param openai_organization: The OpenAI-Organization ID, defaults to `None`. For more details, see OpenAI [documentation](https://platform.openai.com/docs/api-reference/requesting-organization). + :param aws_config: The aws_config contains {aws_access_key, aws_secret_key, aws_region, profile_name} to use with the boto3 Session for an AWS Bedrock retriever. Defaults to 'None'. """ torch_and_transformers_import.check() @@ -1565,6 +1573,7 @@ def __init__( self.azure_base_url = azure_base_url self.azure_deployment_name = azure_deployment_name self.openai_organization = openai_organization + self.aws_config = aws_config self.model_format = ( self._infer_model_format(model_name_or_path=embedding_model, use_auth_token=use_auth_token) if model_format is None @@ -1885,6 +1894,7 @@ def _preprocess_documents(self, docs: List[Document]) -> List[Document]: @staticmethod def _infer_model_format(model_name_or_path: str, use_auth_token: Optional[Union[str, bool]]) -> str: + # pylint: disable=too-many-return-statements valid_openai_model_name = model_name_or_path in ["ada", "babbage", "davinci", "curie"] or any( m in model_name_or_path for m in ["-ada-", "-babbage-", "-davinci-", "-curie-"] ) @@ -1892,6 +1902,8 @@ def _infer_model_format(model_name_or_path: str, use_auth_token: Optional[Union[ return "openai" if model_name_or_path in COHERE_EMBEDDING_MODELS: return "cohere" + if model_name_or_path in BEDROCK_EMBEDDING_MODELS: + return "bedrock" # Check if model name is a local directory with sentence transformers config file in it if Path(model_name_or_path).exists(): if Path(f"{model_name_or_path}/config_sentence_transformers.json").exists(): diff --git a/releasenotes/notes/aws-bedrock-embedding-encoder-a978884c1a2c8237.yaml b/releasenotes/notes/aws-bedrock-embedding-encoder-a978884c1a2c8237.yaml new file mode 100644 index 0000000000..caa1f479b8 --- /dev/null +++ b/releasenotes/notes/aws-bedrock-embedding-encoder-a978884c1a2c8237.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Adding Bedrock Embeddings Encoder to use as a retriever.