diff --git a/comps/cores/mega/gateway.py b/comps/cores/mega/gateway.py index dc6b076bc..e6a0d6dcf 100644 --- a/comps/cores/mega/gateway.py +++ b/comps/cores/mega/gateway.py @@ -1048,13 +1048,13 @@ def parser_input(data, TypeClass, key): if isinstance(response, StreamingResponse): return response last_node = runtime_graph.all_leaves()[-1] - response = result_dict[last_node]["text"] + response_content = result_dict[last_node]["choices"][0]["message"]["content"] choices = [] usage = UsageInfo() choices.append( ChatCompletionResponseChoice( index=0, - message=ChatMessage(role="assistant", content=response), + message=ChatMessage(role="assistant", content=response_content), finish_reason="stop", ) ) diff --git a/comps/dataprep/neo4j/llama_index/README.md b/comps/dataprep/neo4j/llama_index/README.md index 47b1bef33..74822f189 100644 --- a/comps/dataprep/neo4j/llama_index/README.md +++ b/comps/dataprep/neo4j/llama_index/README.md @@ -1,5 +1,14 @@ # Dataprep Microservice with Neo4J +This Dataprep microservice performs: + +- Graph extraction (entities, relationships and descripttions) using LLM +- Performs hierarchical_leiden clustering to identify communities in the knowledge graph +- Generates a community symmary for each community +- Stores all of the above in Neo4j Graph DB + +This microservice follows the graphRAG approached defined by Microsoft paper ["From Local to Global: A Graph RAG Approach to Query-Focused Summarization"](https://www.microsoft.com/en-us/research/publication/from-local-to-global-a-graph-rag-approach-to-query-focused-summarization/) with some differences such as: 1) only level zero cluster summaries are leveraged, 2) The input context to the final answer generation is trimmed to fit maximum context length. + This dataprep microservice ingests the input files and uses LLM (TGI or OpenAI model when OPENAI_API_KEY is set) to extract entities, relationships and descriptions of those to build a graph-based text index. ## Setup Environment Variables @@ -78,6 +87,11 @@ curl -X POST \ http://${host_ip}:6004/v1/dataprep ``` +Please note that clustering of extracted entities and summarization happens in this data preparation step. The result of this is: + +- Large processing time for large dataset. An LLM call is done to summarize each cluster which may result in large volume of LLM calls +- Need to clean graph GB entity_info and Cluster if dataprep is run multiple times since the resulting cluster numbering will differ between consecutive calls and will corrupt the results. + We support table extraction from pdf documents. You can specify process_table and table_strategy by the following commands. "table_strategy" refers to the strategies to understand tables for table retrieval. As the setting progresses from "fast" to "hq" to "llm," the focus shifts towards deeper table understanding at the expense of processing speed. The default strategy is "fast". Note: If you specify "table_strategy=llm" TGI service will be used. diff --git a/comps/dataprep/neo4j/llama_index/extract_graph_neo4j.py b/comps/dataprep/neo4j/llama_index/extract_graph_neo4j.py index db5c5151f..198b61048 100644 --- a/comps/dataprep/neo4j/llama_index/extract_graph_neo4j.py +++ b/comps/dataprep/neo4j/llama_index/extract_graph_neo4j.py @@ -40,6 +40,7 @@ from llama_index.llms.text_generation_inference import TextGenerationInference from neo4j import GraphDatabase from openai import Client +from transformers import AutoTokenizer from comps import CustomLogger, DocPath, opea_microservices, register_microservice from comps.dataprep.utils import ( @@ -67,14 +68,24 @@ class GraphRAGStore(Neo4jPropertyGraphStore): # https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/cookbooks/GraphRAG_v2.ipynb community_summary = {} entity_info = None - max_cluster_size = 5 + max_cluster_size = 100 def __init__(self, username: str, password: str, url: str, llm: LLM): super().__init__(username=username, password=password, url=url) self.llm = llm + self.driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) def generate_community_summary(self, text): """Generate summary for a given text using an LLM.""" + # Get model information from the TGI endpoint + model_name = get_attribute_from_tgi_endpoint(TGI_LLM_ENDPOINT, "model_id") + max_input_length = get_attribute_from_tgi_endpoint(TGI_LLM_ENDPOINT, "max_input_length") + if not model_name or not max_input_length: + raise ValueError(f"Could not retrieve model information from TGI endpoint: {TGI_LLM_ENDPOINT}") + + # Get the tokenizer + tokenizer = AutoTokenizer.from_pretrained(model_name) + messages = [ ChatMessage( role="system", @@ -89,10 +100,14 @@ def generate_community_summary(self, text): ), ChatMessage(role="user", content=text), ] + # Trim the messages to fit within the token limit + # Microsoft does more sophisticated content optimization + trimmed_messages = trim_messages_to_token_limit(tokenizer, messages, max_input_length) + if OPENAI_API_KEY: response = OpenAI().chat(messages) else: - response = self.llm.chat(messages) + response = self.llm.chat(trimmed_messages) clean_response = re.sub(r"^assistant:\s*", "", str(response)).strip() return clean_response @@ -101,13 +116,21 @@ def build_communities(self): """Builds communities from the graph and summarizes them.""" nx_graph = self._create_nx_graph() community_hierarchical_clusters = hierarchical_leiden(nx_graph, max_cluster_size=self.max_cluster_size) + logger.info(f"Number of clustered entities: {len(community_hierarchical_clusters)}") + logger.info(f"Community hierarchical clusters: {community_hierarchical_clusters}") + n_clusters = set([hc.cluster for hc in community_hierarchical_clusters]) + logger.info(f"number of communities/clusters: {len(n_clusters)}") self.entity_info, community_info = self._collect_community_info(nx_graph, community_hierarchical_clusters) + # self._print_cluster_info(self.entity_info, community_info) + self.save_entity_info(self.entity_info) + # entity_from_db = self.read_entity_info() # to verify if the data is stored in db self._summarize_communities(community_info) + # sum = self.read_all_community_summaries() # to verify summaries are stored in db def _create_nx_graph(self): """Converts internal graph representation to NetworkX graph.""" nx_graph = nx.Graph() - triplets = self.get_triplets() + triplets = self.get_triplets() # [src, rel, tgt] for entity1, relation, entity2 in triplets: nx_graph.add_node(entity1.name) nx_graph.add_node(entity2.name) @@ -143,52 +166,115 @@ def _collect_community_info(self, nx_graph, clusters): return dict(entity_info), dict(community_info) + def _print_cluster_info(self, entity_info, community_info): + """Print detailed information about each community cluster. + + Args: + entity_info (dict): Dictionary where keys are nodes and values are lists of cluster IDs the node belongs to. + community_info (dict): Dictionary where keys are cluster IDs and values are lists of relationship details within the cluster. + """ + print("Community Cluster Information:\n") + + for cluster_id, details in community_info.items(): + print(f"Cluster ID: {cluster_id}") + print("Nodes in this cluster:") + + # Find nodes that belong to this cluster + nodes_in_cluster = [node for node, clusters in entity_info.items() if cluster_id in clusters] + for node in nodes_in_cluster: + print(f" - Node: {node}") + + print("Relationships in this cluster:") + for detail in details: + print(f" - {detail}") + + print("\n" + "-" * 40 + "\n") + + def save_entity_info(self, entity_info: dict) -> None: + with self.driver.session() as session: + for entity_id, cluster_ids in entity_info.items(): + entity_node = EntityNode(id=entity_id, name=str(entity_id)) + for cluster_id in cluster_ids: + cluster_node = EntityNode(id=int(cluster_id), name=str(cluster_id)) + relation_metadata = {"relationship_description": "BELONGS_TO"} + rel_node = Relation( + label="BELONGS_TO", + source_id=entity_node.id, + target_id=cluster_node.id, + properties=relation_metadata, + ) + + session.run( + """ + MERGE (e:Entity {id: $entity_id, name: $entity_name}) + MERGE (c:Cluster {id: $cluster_id, name: $cluster_name}) + MERGE (e)-[r:BELONGS_TO]->(c) + ON CREATE SET r.relationship_description = $relationship_description + """, + entity_id=entity_node.id, + entity_name=entity_node.name, + cluster_id=cluster_node.id, + cluster_name=cluster_node.name, + relationship_description=relation_metadata["relationship_description"], + ) + + def read_entity_info(self) -> dict: + entity_info = {} + with self.driver.session() as session: + result = session.run( + """ + MATCH (e:Entity)-[:BELONGS_TO]->(c:Cluster) + RETURN e.id AS entity_id, collect(DISTINCT c.id) AS cluster_ids + """ + ) + for record in result: + # entity_info[record['entity_id']] = record['cluster_ids'] + entity_info[record["entity_id"]] = [int(cluster_id) for cluster_id in record["cluster_ids"]] + return entity_info + def _summarize_communities(self, community_info): """Generate and store summaries for each community.""" for community_id, details in community_info.items(): + logger.info(f"Summarizing community {community_id}") details_text = "\n".join(details) + "." # Ensure it ends with a period self.community_summary[community_id] = self.generate_community_summary(details_text) # To store summaries in neo4j - # summary = self.generate_community_summary(details_text) + summary = self.generate_community_summary(details_text) + self.store_community_summary_in_neo4j(community_id, summary) # self.community_summary[ # community_id # ] = self.store_community_summary_in_neo4j(community_id, summary) def store_community_summary_in_neo4j(self, community_id, summary): """Store the community summary in Neo4j.""" - with driver.session() as session: + with self.driver.session() as session: session.run( """ - MERGE (c:Community {id: $community_id}) + MERGE (c:Cluster {id: $community_id}) SET c.summary = $summary """, - community_id=community_id, + community_id=int(community_id), summary=summary, ) - def get_community_summaries(self): - """Returns the community summaries, building them if not already done.""" - if not self.community_summary: - self.build_communities() - return self.community_summary - - def query_community_summaries(self): - """Query and print community summaries from Neo4j.""" - with driver.session() as session: + def read_all_community_summaries(self) -> dict: + """Read all community summaries from Neo4j.""" + community_summaries = {} + with self.driver.session() as session: result = session.run( """ - MATCH (c:Community) + MATCH (c:Cluster) RETURN c.id AS community_id, c.summary AS summary - """ + """ ) for record in result: - print(f"Community ID: {record['community_id']}") - print(f"Community Summary: {record['summary']}") + community_summaries[int(record["community_id"])] = record["summary"] + return community_summaries def query_schema(self): """Query and print the schema information from Neo4j.""" - with driver.session() as session: + with self.driver.session() as session: result = session.run("CALL apoc.meta.schema()") schema = result.single()["value"] @@ -233,8 +319,8 @@ def __init__( llm: Optional[LLM] = None, extract_prompt: Optional[Union[str, PromptTemplate]] = None, parse_fn: Callable = default_parse_triplets_fn, - max_paths_per_chunk: int = 10, - num_workers: int = 4, + max_paths_per_chunk: int = 4, + num_workers: int = 10, ) -> None: """Init params.""" from llama_index.core import Settings @@ -297,6 +383,8 @@ async def _aextract(self, node: BaseNode) -> BaseNode: node.metadata[KG_NODES_KEY] = existing_nodes node.metadata[KG_RELATIONS_KEY] = existing_relations + logger.info(f"number of extracted nodes {len(existing_nodes), existing_nodes}") + logger.info(f"number of extracted relations {len(existing_relations), existing_relations}") return node async def acall(self, nodes: List[BaseNode], show_progress: bool = False, **kwargs: Any) -> List[BaseNode]: @@ -348,29 +436,6 @@ async def acall(self, nodes: List[BaseNode], show_progress: bool = False, **kwar relationship_pattern = r'\("relationship"\$\$\$\$(.+?)\$\$\$\$(.+?)\$\$\$\$(.+?)\$\$\$\$(.+?)\)' -def inspect_db(): - try: - with driver.session() as session: - # Check for property keys - result = session.run("CALL db.propertyKeys()") - property_keys = [record["propertyKey"] for record in result] - print("Property Keys:", property_keys) - - # Check for node labels - result = session.run("CALL db.labels()") - labels = [record["label"] for record in result] - print("Node Labels:", labels) - - # Check for relationship types - result = session.run("CALL db.relationshipTypes()") - relationship_types = [record["relationshipType"] for record in result] - print("Relationship Types:", relationship_types) - except Exception as e: - print(f"Error: {e}") - finally: - driver.close() - - def parse_fn(response_str: str) -> Any: entities = re.findall(entity_pattern, response_str) relationships = re.findall(relationship_pattern, response_str) @@ -379,17 +444,18 @@ def parse_fn(response_str: str) -> Any: return entities, relationships -def get_model_name_from_tgi_endpoint(url): +def get_attribute_from_tgi_endpoint(url, attribute_name): + """Get a specific attribute from the TGI endpoint.""" try: response = requests.get(f"{url}/info") response.raise_for_status() # Ensure we notice bad responses try: model_info = response.json() - model_name = model_info.get("model_id") - if model_name: - return model_name + attribute_value = model_info.get(attribute_name) + if attribute_value is not None: + return attribute_value else: - logger.error(f"model_id not found in the response from {url}") + logger.error(f"{attribute_name} not found in the response from {url}") return None except ValueError: logger.error(f"Invalid JSON response from {url}") @@ -399,11 +465,32 @@ def get_model_name_from_tgi_endpoint(url): return None +def trim_messages_to_token_limit(tokenizer, messages, max_tokens): + """Trim the messages to fit within the token limit.""" + total_tokens = 0 + trimmed_messages = [] + + for message in messages: + tokens = tokenizer.tokenize(message.content) + total_tokens += len(tokens) + if total_tokens > max_tokens: + # Trim the message to fit within the remaining token limit + logger.info(f"Trimming messages: {total_tokens} > {max_tokens}") + remaining_tokens = max_tokens - (total_tokens - len(tokens)) + tokens = tokens[:remaining_tokens] + message.content = tokenizer.convert_tokens_to_string(tokens) + trimmed_messages.append(message) + break + else: + trimmed_messages.append(message) + + return trimmed_messages + + logger = CustomLogger("prepare_doc_neo4j") logflag = os.getenv("LOGFLAG", False) upload_folder = "./uploaded_files/" -driver = GraphDatabase.driver(NEO4J_URL, auth=(NEO4J_USERNAME, NEO4J_PASSWORD)) client = OpenAI() @@ -465,18 +552,19 @@ def ingest_data_to_neo4j(doc_path: DocPath): logger.info(f"An error occurred while verifying the API Key: {e}") else: logger.info("NO OpenAI API Key. TGI/TEI endpoints will be used.") - llm_name = get_model_name_from_tgi_endpoint(TGI_LLM_ENDPOINT) + llm_name = get_attribute_from_tgi_endpoint(TGI_LLM_ENDPOINT, "model_id") llm = TextGenerationInference( model_url=TGI_LLM_ENDPOINT, model_name=llm_name, temperature=0.7, - max_tokens=1512, # 512otherwise too shor + max_tokens=1512, + timeout=600, # timeout in seconds ) - emb_name = get_model_name_from_tgi_endpoint(TEI_EMBEDDING_ENDPOINT) + emb_name = get_attribute_from_tgi_endpoint(TEI_EMBEDDING_ENDPOINT, "model_id") embed_model = TextEmbeddingsInference( base_url=TEI_EMBEDDING_ENDPOINT, model_name=emb_name, - timeout=60, # timeout in seconds + timeout=600, # timeout in seconds embed_batch_size=10, # batch size for embedding ) Settings.embed_model = embed_model @@ -498,16 +586,23 @@ def ingest_data_to_neo4j(doc_path: DocPath): embed_model=embed_model or Settings.embed_model, show_progress=True, ) - inspect_db() if logflag: + logger.info("The graph is built.") logger.info(f"Total number of triplets {len(index.property_graph_store.get_triplets())}") - # index.property_graph_store.build_communities() - # print("done building communities") - if logflag: - logger.info("The graph is built.") + logger.info("Done building communities.") + + return index + +def build_communities(index: PropertyGraphIndex): + try: + index.property_graph_store.build_communities() + if logflag: + logger.info("Done building communities.") + except Exception as e: + logger.error(f"Error building communities: {e}") return True @@ -539,7 +634,7 @@ async def ingest_documents( encode_file = encode_filename(file.filename) save_path = upload_folder + encode_file await save_content_to_local_disk(save_path, file) - ingest_data_to_neo4j( + index = ingest_data_to_neo4j( DocPath( path=save_path, chunk_size=chunk_size, @@ -551,10 +646,6 @@ async def ingest_documents( uploaded_files.append(save_path) if logflag: logger.info(f"Successfully saved file {save_path}") - result = {"status": 200, "message": "Data preparation succeeded"} - if logflag: - logger.info(result) - return result if link_list: link_list = json.loads(link_list) # Parse JSON string to list @@ -566,7 +657,7 @@ async def ingest_documents( content = parse_html([link])[0][0] try: await save_content_to_local_disk(save_path, content) - ingest_data_to_neo4j( + index = ingest_data_to_neo4j( DocPath( path=save_path, chunk_size=chunk_size, @@ -576,17 +667,19 @@ async def ingest_documents( ) ) except json.JSONDecodeError: - raise HTTPException(status_code=500, detail="Fail to ingest data into qdrant.") + raise HTTPException(status_code=500, detail="Fail to ingest data") if logflag: logger.info(f"Successfully saved link {link}") + if files or link_list: + build_communities(index) result = {"status": 200, "message": "Data preparation succeeded"} if logflag: logger.info(result) return result - - raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") + else: + raise HTTPException(status_code=400, detail="Must provide either a file or a string list.") if __name__ == "__main__": diff --git a/comps/dataprep/neo4j/llama_index/set_env.sh b/comps/dataprep/neo4j/llama_index/set_env.sh index dd5d2a15d..58980ebbe 100644 --- a/comps/dataprep/neo4j/llama_index/set_env.sh +++ b/comps/dataprep/neo4j/llama_index/set_env.sh @@ -8,7 +8,7 @@ export EMBEDDING_MODEL_ID="BAAI/bge-base-en-v1.5" export OPENAI_EMBEDDING_MODEL="text-embedding-3-small" -export LLM_MODEL_ID="Intel/neural-chat-7b-v3-3" +export LLM_MODEL_ID="meta-llama/Meta-Llama-3-8B-Instruct" export OPENAI_LLM_MODEL="gpt-4o" export TEI_EMBEDDING_ENDPOINT="http://${host_ip}:6006" export TGI_LLM_ENDPOINT="http://${host_ip}:6005" diff --git a/comps/nginx/nginx.conf.template b/comps/nginx/nginx.conf.template index 281db02c2..f0d0f8d6b 100644 --- a/comps/nginx/nginx.conf.template +++ b/comps/nginx/nginx.conf.template @@ -43,6 +43,10 @@ server { proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; + proxy_connect_timeout 6000; + proxy_send_timeout 6000; + proxy_read_timeout 6000; + send_timeout 6000; } location /v1/dataprep/get_file { diff --git a/comps/retrievers/neo4j/llama_index/README.md b/comps/retrievers/neo4j/llama_index/README.md index 05368a3d0..eb6f01579 100644 --- a/comps/retrievers/neo4j/llama_index/README.md +++ b/comps/retrievers/neo4j/llama_index/README.md @@ -1,11 +1,11 @@ # Retriever Microservice with Neo4J -This retrieval miicroservice is intended for use in GraphRAG pipeline and assumes a GraphRAGStore exists. +This retrieval microservice is intended for use in GraphRAG pipeline and assumes a GraphRAGStore containing document graph, entity_info and Community Symmaries already exist. Please refer to the GenAIExamples/GraphRAG example. + Retrieval follows these steps: -- Performs hierarchical_leiden clustering to identify communities in the knowledge graph -- Performs similarty to find the relevant entities to the input query -- Generates a community symmary for each community +- Uses similarty to find the relevant entities to the input query. Retrieval is done over the neo4j index that natively supports embeddings. +- Uses Cypher queries to retrieve the community summaries for all the communities the entities belong to. - Generates a partial answer to the query for each community summary. This will later be used as context to generate a final query response. Please refer to [GenAIExamples/GraphRAG](https://github.com/opea-project/GenAIExamples). ## 🚀Start Microservice with Docker @@ -37,7 +37,7 @@ source ./set_env.sh ### 3. Run Docker with Docker Compose -Docker compose will start 5 microservices: retriever-neo4j-llamaindex, dataprep-neo4j-llamaindex, neo4j-apoc, tgi-gaudi-service and tei-embedding-service. The reason TGI and TEI are needed is because retriever relies on LLM to extract community summaries from the community triplets that are identified as relevant to the input query. Neo4j database supports embeddings natively so we do not need a separate vector store. Checkout the blog [Introducing the Property Graph Index: A Powerful New Way to Build Knowledge Graphs with LLMs](https://www.llamaindex.ai/blog/introducing-the-property-graph-index-a-powerful-new-way-to-build-knowledge-graphs-with-llms) for a better understanding of Property Graph Store and Index. +Docker compose will start 5 microservices: retriever-neo4j-llamaindex, dataprep-neo4j-llamaindex, neo4j-apoc, tgi-gaudi-service and tei-embedding-service. Neo4j database supports embeddings natively so we do not need a separate vector store. Checkout the blog [Introducing the Property Graph Index: A Powerful New Way to Build Knowledge Graphs with LLMs](https://www.llamaindex.ai/blog/introducing-the-property-graph-index-a-powerful-new-way-to-build-knowledge-graphs-with-llms) for a better understanding of Property Graph Store and Index. ```bash cd comps/retrievers/neo4j/llama_index diff --git a/comps/retrievers/neo4j/llama_index/retriever_community_answers_neo4j.py b/comps/retrievers/neo4j/llama_index/retriever_community_answers_neo4j.py index 837cd87e9..830dc2775 100644 --- a/comps/retrievers/neo4j/llama_index/retriever_community_answers_neo4j.py +++ b/comps/retrievers/neo4j/llama_index/retriever_community_answers_neo4j.py @@ -26,6 +26,7 @@ from llama_index.embeddings.text_embeddings_inference import TextEmbeddingsInference from llama_index.llms.openai import OpenAI from llama_index.llms.text_generation_inference import TextGenerationInference +from neo4j import GraphDatabase from pydantic import BaseModel, PrivateAttr from comps import ( @@ -45,7 +46,7 @@ RetrievalResponse, RetrievalResponseData, ) -from comps.dataprep.neo4j.llama_index.extract_graph_neo4j import GraphRAGStore, get_model_name_from_tgi_endpoint +from comps.dataprep.neo4j.llama_index.extract_graph_neo4j import GraphRAGStore, get_attribute_from_tgi_endpoint logger = CustomLogger("retriever_neo4j") logflag = os.getenv("LOGFLAG", False) @@ -70,15 +71,16 @@ def custom_query(self, query_str: str) -> RetrievalResponseData: """Process all community summaries to generate answers to a specific query.""" entities = self.get_entities(query_str, self._similarity_top_k) - - community_ids = self.retrieve_entity_communities(self._graph_store.entity_info, entities) - community_summaries = self._graph_store.get_community_summaries() + entity_info = self._graph_store.read_entity_info() + community_ids = self.retrieve_entity_communities(entity_info, entities) + community_summaries = self.retrieve_community_summaries_cypher(entities) + community_ids = list(community_summaries.keys()) if logflag: logger.info(f"Community ids: {community_ids}") + # community_summaries of relevant communities community_answers = [ self.generate_answer_from_summary(community_summary, query_str) for id, community_summary in community_summaries.items() - if id in community_ids ] # Convert answers to RetrievalResponseData objects response_data = [RetrievalResponseData(text=answer, metadata={}) for answer in community_answers] @@ -94,14 +96,16 @@ def get_entities(self, query_str, similarity_top_k): similarity_top_k=self._similarity_top_k, # similarity_score=0.6 ) - nodes_retrieved = self._index.as_retriever( - sub_retrievers=[vecContext_retriever], similarity_top_k=self._similarity_top_k - ).retrieve(query_str) + # nodes_retrieved = self._index.as_retriever( + # sub_retrievers=[vecContext_retriever], similarity_top_k=self._similarity_top_k + # ).retrieve(query_str) # if subretriever not specified it will use LLMSynonymRetriever with Settings.llm model - # nodes_retrieved = self._index.as_retriever(similarity_top_k=self._similarity_top_k).retrieve(query_str) + nodes_retrieved = self._index.as_retriever(similarity_top_k=self._similarity_top_k).retrieve(query_str) entities = set() pattern = r"(\w+(?:\s+\w+)*)\s*->\s*(\w+(?:\s+\w+)*)\s*->\s*(\w+(?:\s+\w+)*)" - + if logflag: + logger.info(f" len of triplets {len(self._index.property_graph_store.get_triplets())}") + logger.info(f"number of nodes retrieved {len(nodes_retrieved), nodes_retrieved}") for node in nodes_retrieved: matches = re.findall(pattern, node.text, re.DOTALL) @@ -132,6 +136,32 @@ def retrieve_entity_communities(self, entity_info, entities): return list(set(community_ids)) + def retrieve_community_summaries_cypher(self, entities): + """Retrieve cluster information and summaries for given entities using a Cypher query. + + Args: + entities (list): List of entity names to retrieve information for. + + Returns: + dict: Dictionary where keys are community or cluster IDs and values are summaries. + """ + community_summaries = {} + print(f"driver working? {self._graph_store.driver})") + + with self._graph_store.driver.session() as session: + for entity in entities: + result = session.run( + """ + MATCH (e:Entity {id: $entity_id})-[:BELONGS_TO]->(c:Cluster) + RETURN c.id AS cluster_id, c.summary AS summary + """, + entity_id=entity, + ) + for record in result: + community_summaries[record["cluster_id"]] = record["summary"] + + return community_summaries + def generate_answer_from_summary(self, community_summary, query): """Generate an answer from a community summary based on a given query using LLM.""" prompt = ( @@ -162,7 +192,11 @@ async def retrieve(input: Union[ChatCompletionRequest]) -> Union[ChatCompletionR if logflag: logger.info(input) start = time.time() - query = input.messages[0]["content"] + + if isinstance(input.messages, str): + query = input.messages + else: + query = input.messages[0]["content"] logger.info(f"Query received in retriever: {query}") if OPENAI_API_KEY: @@ -178,14 +212,14 @@ async def retrieve(input: Union[ChatCompletionRequest]) -> Union[ChatCompletionR logger.info(f"An error occurred while verifying the API Key: {e}") else: logger.info("No OpenAI API KEY provided. Will use TGI and TEI endpoints") - llm_name = get_model_name_from_tgi_endpoint(TGI_LLM_ENDPOINT) + llm_name = get_attribute_from_tgi_endpoint(TGI_LLM_ENDPOINT, "model_id") llm = TextGenerationInference( model_url=TGI_LLM_ENDPOINT, model_name=llm_name, temperature=0.7, max_tokens=1512, # 512otherwise too shor ) - emb_name = get_model_name_from_tgi_endpoint(TEI_EMBEDDING_ENDPOINT) + emb_name = get_attribute_from_tgi_endpoint(TEI_EMBEDDING_ENDPOINT, "model_id") embed_model = TextEmbeddingsInference( base_url=TEI_EMBEDDING_ENDPOINT, model_name=emb_name, @@ -202,7 +236,7 @@ async def retrieve(input: Union[ChatCompletionRequest]) -> Union[ChatCompletionR embed_model=embed_model or Settings.embed_model, embed_kg_nodes=True, ) - index.property_graph_store.build_communities() + query_engine = GraphRAGQueryEngine( graph_store=index.property_graph_store, llm=llm, diff --git a/tests/dataprep/test_dataprep_neo4j_llama_index_on_intel_hpu.sh b/tests/dataprep/test_dataprep_neo4j_llama_index_on_intel_hpu.sh index 81b716993..aeacdea0a 100755 --- a/tests/dataprep/test_dataprep_neo4j_llama_index_on_intel_hpu.sh +++ b/tests/dataprep/test_dataprep_neo4j_llama_index_on_intel_hpu.sh @@ -48,7 +48,7 @@ function start_service() { docker run -d --name="test-comps-dataprep-neo4j-server" -p 6004:6004 -v ./data:/data --ipc=host -e TGI_LLM_ENDPOINT=$TGI_LLM_ENDPOINT \ -e TEI_EMBEDDING_ENDPOINT=$TEI_EMBEDDING_ENDPOINT -e EMBEDDING_MODEL_ID=$emb_model -e LLM_MODEL_ID=$model -e host_ip=$ip_address -e no_proxy=$no_proxy \ -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e NEO4J_URI="bolt://${ip_address}:7687" -e NEO4J_USERNAME="neo4j" \ - -e NEO4J_PASSWORD="neo4jtest" -e LOGFLAG=True opea/dataprep-neo4j-llamaindex:comps + -e NEO4J_PASSWORD="neo4jtest" -e HF_TOKEN=$HF_TOKEN -e LOGFLAG=True opea/dataprep-neo4j-llamaindex:comps sleep 30s export DATAPREP_SERVICE_ENDPOINT="http://${ip_address}:6004" @@ -122,16 +122,6 @@ function validate_microservice() { "test-comps-dataprep-neo4j-server" } -function kill_process_on_port() { - local port=$1 - local pid=$(lsof -t -i:$port) - if [[ ! -z "$pid" ]]; then - echo "Killing process $pid on port $port" - kill -9 $pid - else - echo "No process found on port $port" - fi -} function stop_docker() { cid_retrievers=$(docker ps -aq --filter "name=test-comps-dataprep-neo4j*") @@ -145,7 +135,6 @@ function stop_docker() { } function main() { - kill_process_on_port 6006 stop_docker diff --git a/tests/retrievers/test_retrievers_neo4j_llama_index_on_intel_hpu.sh b/tests/retrievers/test_retrievers_neo4j_llama_index_on_intel_hpu.sh index e4606bde8..3d252ffc1 100755 --- a/tests/retrievers/test_retrievers_neo4j_llama_index_on_intel_hpu.sh +++ b/tests/retrievers/test_retrievers_neo4j_llama_index_on_intel_hpu.sh @@ -68,7 +68,7 @@ function start_service() { docker run -d --name="test-comps-retrievers-neo4j-llama-index-dataprep" -p 6004:6004 -v ./data:/data --ipc=host -e TGI_LLM_ENDPOINT=$TGI_LLM_ENDPOINT \ -e TEI_EMBEDDING_ENDPOINT=$TEI_EMBEDDING_ENDPOINT -e EMBEDDING_MODEL_ID=$emb_model -e LLM_MODEL_ID=$model -e host_ip=$ip_address -e no_proxy=$no_proxy \ -e http_proxy=$http_proxy -e https_proxy=$https_proxy -e NEO4J_URI="bolt://${ip_address}:7687" -e NEO4J_USERNAME="neo4j" \ - -e NEO4J_PASSWORD="neo4jtest" -e LOGFLAG=True opea/dataprep-neo4j-llamaindex:comps + -e NEO4J_PASSWORD="neo4jtest" -e HF_TOKEN=$HF_TOKEN -e LOGFLAG=True opea/dataprep-neo4j-llamaindex:comps sleep 30s export DATAPREP_SERVICE_ENDPOINT="http://${ip_address}:6004"