From d68a3be32e5a4c720df45e07098496f63cd53e7b Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 11 Oct 2024 13:23:11 +0200 Subject: [PATCH 01/32] feat: Add config support for pgvector Added config support for using pgvector Feature #COG-170 --- .env.template | 6 +++--- README.md | 4 ++-- cognee/api/client.py | 2 +- .../infrastructure/databases/vector/create_vector_engine.py | 2 ++ cognee/modules/settings/get_settings.py | 3 +++ cognee/modules/settings/save_vector_db_config.py | 2 +- docker-compose.yml | 2 +- 7 files changed, 13 insertions(+), 8 deletions(-) diff --git a/.env.template b/.env.template index 43cdedee..ada92ad3 100644 --- a/.env.template +++ b/.env.template @@ -7,19 +7,19 @@ GRAPHISTRY_PASSWORD= SENTRY_REPORTING_URL= -GRAPH_DATABASE_PROVIDER="neo4j" # or "networkx" +GRAPH_DATABASE_PROVIDER="neo4j" # "neo4j" or "networkx" # Not needed if using networkx GRAPH_DATABASE_URL= GRAPH_DATABASE_USERNAME= GRAPH_DATABASE_PASSWORD= -VECTOR_ENGINE_PROVIDER="qdrant" # or "weaviate" or "lancedb" +VECTOR_ENGINE_PROVIDER="qdrant" # or "qdrant", "pgvector", "weaviate" or "lancedb" # Not needed if using "lancedb" VECTOR_DB_URL= VECTOR_DB_KEY= # Database provider -DB_PROVIDER="sqlite" # or "postgres" +DB_PROVIDER="sqlite" # "sqlite" or "postgres" # Database name DB_NAME=cognee_db diff --git a/README.md b/README.md index 7255bf75..c1dfdb0a 100644 --- a/README.md +++ b/README.md @@ -190,11 +190,11 @@ Cognee supports a variety of tools and services for different operations: - **Local Setup**: By default, LanceDB runs locally with NetworkX and OpenAI. -- **Vector Stores**: Cognee supports Qdrant and Weaviate for vector storage. +- **Vector Stores**: Cognee supports LanceDB, Qdrant, PGVector and Weaviate for vector storage. - **Language Models (LLMs)**: You can use either Anyscale or Ollama as your LLM provider. -- **Graph Stores**: In addition to LanceDB, Neo4j is also supported for graph storage. +- **Graph Stores**: In addition to NetworkX, Neo4j is also supported for graph storage. - **User management**: Create individual user graphs and manage permissions diff --git a/cognee/api/client.py b/cognee/api/client.py index 6a0759cd..7853f4d0 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -362,7 +362,7 @@ class LLMConfig(BaseModel): apiKey: str class VectorDBConfig(BaseModel): - provider: Union[Literal["lancedb"], Literal["qdrant"], Literal["weaviate"]] + provider: Union[Literal["lancedb"], Literal["qdrant"], Literal["weaviate"], Literal["pgvector"]] url: str apiKey: str diff --git a/cognee/infrastructure/databases/vector/create_vector_engine.py b/cognee/infrastructure/databases/vector/create_vector_engine.py index 19859ae4..e3571152 100644 --- a/cognee/infrastructure/databases/vector/create_vector_engine.py +++ b/cognee/infrastructure/databases/vector/create_vector_engine.py @@ -26,6 +26,8 @@ def create_vector_engine(config: VectorConfig, embedding_engine): api_key = config["vector_db_key"], embedding_engine = embedding_engine ) + elif config["vector_db_provider"] == "pgvector": + pass else: from .lancedb.LanceDBAdapter import LanceDBAdapter diff --git a/cognee/modules/settings/get_settings.py b/cognee/modules/settings/get_settings.py index 2cc4650a..6b2edd55 100644 --- a/cognee/modules/settings/get_settings.py +++ b/cognee/modules/settings/get_settings.py @@ -13,6 +13,9 @@ def get_settings(): }, { "value": "lancedb", "label": "LanceDB", + }, { + "value": "pgvector", + "label": "PGVector", }] vector_config = get_vectordb_config() diff --git a/cognee/modules/settings/save_vector_db_config.py b/cognee/modules/settings/save_vector_db_config.py index bef5f470..b1546e17 100644 --- a/cognee/modules/settings/save_vector_db_config.py +++ b/cognee/modules/settings/save_vector_db_config.py @@ -5,7 +5,7 @@ class VectorDBConfig(BaseModel): url: str apiKey: str - provider: Union[Literal["lancedb"], Literal["qdrant"], Literal["weaviate"]] + provider: Union[Literal["lancedb"], Literal["qdrant"], Literal["weaviate"], Literal["pgvector"]] async def save_vector_db_config(vector_db_config: VectorDBConfig): vector_config = get_vectordb_config() diff --git a/docker-compose.yml b/docker-compose.yml index 2ef05170..426b178a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -62,7 +62,7 @@ services: - cognee-network postgres: - image: postgres:latest + image: pgvector/pgvector:pg17 container_name: postgres environment: POSTGRES_USER: cognee From c62dfdda9b6f8a1ab8230fe2e8964538db030915 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 11 Oct 2024 15:00:28 +0200 Subject: [PATCH 02/32] feat: Add PGVectorAdapter Added PGVectorAdapter Feature #COG-170 --- .env.template | 10 +- .../infrastructure/databases/vector/config.py | 2 + .../databases/vector/create_vector_engine.py | 18 +- .../vector/pgvector/PGVectorAdapter.py | 170 ++++++++++++++++++ .../databases/vector/pgvector/__init__.py | 0 5 files changed, 195 insertions(+), 5 deletions(-) create mode 100644 cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py create mode 100644 cognee/infrastructure/databases/vector/pgvector/__init__.py diff --git a/.env.template b/.env.template index ada92ad3..c6194a64 100644 --- a/.env.template +++ b/.env.template @@ -14,17 +14,19 @@ GRAPH_DATABASE_USERNAME= GRAPH_DATABASE_PASSWORD= VECTOR_ENGINE_PROVIDER="qdrant" # or "qdrant", "pgvector", "weaviate" or "lancedb" -# Not needed if using "lancedb" +# Not needed if using "lancedb" or "pgvector" VECTOR_DB_URL= VECTOR_DB_KEY= +# Needed if using "pgvector" +VECTOR_DB_NAME= -# Database provider +# Relational Database provider DB_PROVIDER="sqlite" # "sqlite" or "postgres" -# Database name +# Relational Database name DB_NAME=cognee_db -# Postgres specific parameters (Only if Postgres is run) +# Postgres specific parameters (Only if Postgres or PGVector is run) DB_HOST=127.0.0.1 DB_PORT=5432 DB_USERNAME=cognee diff --git a/cognee/infrastructure/databases/vector/config.py b/cognee/infrastructure/databases/vector/config.py index 8137a067..5e403c92 100644 --- a/cognee/infrastructure/databases/vector/config.py +++ b/cognee/infrastructure/databases/vector/config.py @@ -10,6 +10,7 @@ class VectorConfig(BaseSettings): ) vector_db_key: str = "" vector_engine_provider: str = "lancedb" + vector_db_name: str = "cognee_vector_db" model_config = SettingsConfigDict(env_file = ".env", extra = "allow") @@ -18,6 +19,7 @@ def to_dict(self) -> dict: "vector_db_url": self.vector_db_url, "vector_db_key": self.vector_db_key, "vector_db_provider": self.vector_engine_provider, + "vector_db_name": self.vector_db_name, } @lru_cache diff --git a/cognee/infrastructure/databases/vector/create_vector_engine.py b/cognee/infrastructure/databases/vector/create_vector_engine.py index e3571152..578a601f 100644 --- a/cognee/infrastructure/databases/vector/create_vector_engine.py +++ b/cognee/infrastructure/databases/vector/create_vector_engine.py @@ -1,9 +1,12 @@ from typing import Dict +from .config import get_relational_config + class VectorConfig(Dict): vector_db_url: str vector_db_key: str vector_db_provider: str + vector_db_name: str def create_vector_engine(config: VectorConfig, embedding_engine): if config["vector_db_provider"] == "weaviate": @@ -27,7 +30,20 @@ def create_vector_engine(config: VectorConfig, embedding_engine): embedding_engine = embedding_engine ) elif config["vector_db_provider"] == "pgvector": - pass + from .pgvector import PGVectorAdapter + + # Get configuration for postgres database + relational_config = get_relational_config() + db_username = relational_config.db_username + db_password = relational_config.db_password + db_host = relational_config.db_host + db_port = relational_config.db_port + + # Get name of vector database + db_name = config["vector_db_name"] + + connection_string = f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}" + return PGVectorAdapter(connection_string) else: from .lancedb.LanceDBAdapter import LanceDBAdapter diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py new file mode 100644 index 00000000..db558721 --- /dev/null +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -0,0 +1,170 @@ +from typing import List, Optional, get_type_hints, Generic, TypeVar +import asyncio +from ..models.ScoredResult import ScoredResult + +from ..vector_db_interface import VectorDBInterface, DataPoint +from ..embeddings.EmbeddingEngine import EmbeddingEngine +from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker + + +# Define the models +class Base(DeclarativeBase): + pass + +class PGVectorAdapter(SQLAlchemyAdapter, VectorDBInterface): + def __init__(self, connection_string: str, + api_key: Optional[str], + embedding_engine: EmbeddingEngine + ): + self.api_key = api_key + self.embedding_engine = embedding_engine + self.db_uri: str = connection_string + + self.engine = create_async_engine(connection_string) + self.sessionmaker = async_sessionmaker(bind=self.engine, expire_on_commit=False) + + # Create pgvector extension in postgres + with engine.begin() as connection: + connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + + + async def embed_data(self, data: list[str]) -> list[list[float]]: + return await self.embedding_engine.embed_text(data) + + async def has_collection(self, collection_name: str) -> bool: + connection = await self.get_async_session() + collection_names = await connection.table_names() + return collection_name in collection_names + + async def create_collection(self, collection_name: str, payload_schema = None): + data_point_types = get_type_hints(DataPoint) + vector_size = self.embedding_engine.get_vector_size() + + class PGVectorDataPoint(Base): + id: Mapped[int] = mapped_column(data_point_types["id"], primary_key=True) + vector = mapped_column(Vector(vector_size)) + payload: mapped_column(payload_schema) + + if not await self.has_collection(collection_name): + connection = await self.get_async_session() + return await connection.create_table( + name = collection_name, + schema = PGVectorDataPoint, + exist_ok = True, + ) + + async def create_data_points(self, collection_name: str, data_points: List[DataPoint]): + connection = await self.get_async_session() + + if not await self.has_collection(collection_name): + await self.create_collection( + collection_name, + payload_schema = type(data_points[0].payload), + ) + + collection = await connection.open_table(collection_name) + + data_vectors = await self.embed_data( + [data_point.get_embeddable_data() for data_point in data_points] + ) + + IdType = TypeVar("IdType") + PayloadSchema = TypeVar("PayloadSchema") + vector_size = self.embedding_engine.get_vector_size() + + class PGVectorDataPoint(Base, Generic[IdType, PayloadSchema]): + id: Mapped[int] = mapped_column(IdType, primary_key=True) + vector = mapped_column(Vector(vector_size)) + payload: mapped_column(PayloadSchema) + + pgvector_data_points = [ + PGVectorDataPoint[type(data_point.id), type(data_point.payload)]( + id = data_point.id, + vector = data_vectors[data_index], + payload = data_point.payload, + ) for (data_index, data_point) in enumerate(data_points) + ] + + await collection.add(pgvector_data_points) + + async def retrieve(self, collection_name: str, data_point_ids: list[str]): + connection = await self.get_async_session() + collection = await connection.open_table(collection_name) + + if len(data_point_ids) == 1: + results = await collection.query().where(f"id = '{data_point_ids[0]}'").to_pandas() + else: + results = await collection.query().where(f"id IN {tuple(data_point_ids)}").to_pandas() + + return [ScoredResult( + id = result["id"], + payload = result["payload"], + score = 0, + ) for result in results.to_dict("index").values()] + + async def search( + self, + collection_name: str, + query_text: str = None, + query_vector: List[float] = None, + limit: int = 5, + with_vector: bool = False, + ): + if query_text is None and query_vector is None: + raise ValueError("One of query_text or query_vector must be provided!") + + if query_text and not query_vector: + query_vector = (await self.embedding_engine.embed_text([query_text]))[0] + + connection = await self.get_async_session() + collection = await connection.open_table(collection_name) + + results = await collection.vector_search(query_vector).limit(limit).to_pandas() + + result_values = list(results.to_dict("index").values()) + + min_value = 100 + max_value = 0 + + for result in result_values: + value = float(result["_distance"]) + if value > max_value: + max_value = value + if value < min_value: + min_value = value + + normalized_values = [(result["_distance"] - min_value) / (max_value - min_value) for result in result_values] + + return [ScoredResult( + id = str(result["id"]), + payload = result["payload"], + score = normalized_values[value_index], + ) for value_index, result in enumerate(result_values)] + + async def batch_search( + self, + collection_name: str, + query_texts: List[str], + limit: int = None, + with_vectors: bool = False, + ): + query_vectors = await self.embedding_engine.embed_text(query_texts) + + return asyncio.gather( + *[self.search( + collection_name = collection_name, + query_vector = query_vector, + limit = limit, + with_vector = with_vectors, + ) for query_vector in query_vectors] + ) + + async def delete_data_points(self, collection_name: str, data_point_ids: list[str]): + connection = await self.get_async_session() + collection = await connection.open_table(collection_name) + results = await collection.delete(f"id IN {tuple(data_point_ids)}") + return results + + async def prune(self): + # Clean up the database if it was set up as temporary + self.delete_database() diff --git a/cognee/infrastructure/databases/vector/pgvector/__init__.py b/cognee/infrastructure/databases/vector/pgvector/__init__.py new file mode 100644 index 00000000..e69de29b From 268396abdcf811cbb6f84aa18bc35e3fc68cfad7 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 11 Oct 2024 17:11:05 +0200 Subject: [PATCH 03/32] feature: Checkpoint during pgvector integration development Saving state of pgvector integration development so far Feature #COG-170 --- .../databases/vector/create_vector_engine.py | 9 +- .../vector/pgvector/PGVectorAdapter.py | 150 +++++++++--------- .../databases/vector/pgvector/__init__.py | 1 + 3 files changed, 84 insertions(+), 76 deletions(-) diff --git a/cognee/infrastructure/databases/vector/create_vector_engine.py b/cognee/infrastructure/databases/vector/create_vector_engine.py index 578a601f..2399eac0 100644 --- a/cognee/infrastructure/databases/vector/create_vector_engine.py +++ b/cognee/infrastructure/databases/vector/create_vector_engine.py @@ -1,6 +1,6 @@ from typing import Dict -from .config import get_relational_config +from ..relational.config import get_relational_config class VectorConfig(Dict): vector_db_url: str @@ -30,7 +30,7 @@ def create_vector_engine(config: VectorConfig, embedding_engine): embedding_engine = embedding_engine ) elif config["vector_db_provider"] == "pgvector": - from .pgvector import PGVectorAdapter + from .pgvector.PGVectorAdapter import PGVectorAdapter # Get configuration for postgres database relational_config = get_relational_config() @@ -43,7 +43,10 @@ def create_vector_engine(config: VectorConfig, embedding_engine): db_name = config["vector_db_name"] connection_string = f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}" - return PGVectorAdapter(connection_string) + return PGVectorAdapter(connection_string, + config["vector_db_key"], + embedding_engine + ) else: from .lancedb.LanceDBAdapter import LanceDBAdapter diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index db558721..8b79fb9d 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -6,12 +6,21 @@ from ..embeddings.EmbeddingEngine import EmbeddingEngine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase, mapped_column +from pgvector.sqlalchemy import Vector + +from ...relational.sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter + # Define the models class Base(DeclarativeBase): pass class PGVectorAdapter(SQLAlchemyAdapter, VectorDBInterface): + async def create_vector_extension(self): + async with self.get_async_session() as session: + await session.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) + def __init__(self, connection_string: str, api_key: Optional[str], embedding_engine: EmbeddingEngine @@ -22,19 +31,15 @@ def __init__(self, connection_string: str, self.engine = create_async_engine(connection_string) self.sessionmaker = async_sessionmaker(bind=self.engine, expire_on_commit=False) - - # Create pgvector extension in postgres - with engine.begin() as connection: - connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - + self.create_vector_extension() async def embed_data(self, data: list[str]) -> list[list[float]]: return await self.embedding_engine.embed_text(data) async def has_collection(self, collection_name: str) -> bool: - connection = await self.get_async_session() - collection_names = await connection.table_names() - return collection_name in collection_names + async with self.engine.begin() as connection: + collection_names = await connection.table_names() + return collection_name in collection_names async def create_collection(self, collection_name: str, payload_schema = None): data_point_types = get_type_hints(DataPoint) @@ -46,61 +51,60 @@ class PGVectorDataPoint(Base): payload: mapped_column(payload_schema) if not await self.has_collection(collection_name): - connection = await self.get_async_session() - return await connection.create_table( - name = collection_name, - schema = PGVectorDataPoint, - exist_ok = True, - ) + async with self.engine.begin() as connection: + return await connection.create_table( + name = collection_name, + schema = PGVectorDataPoint, + exist_ok = True, + ) async def create_data_points(self, collection_name: str, data_points: List[DataPoint]): - connection = await self.get_async_session() + async with self.engine.begin() as connection: + if not await self.has_collection(collection_name): + await self.create_collection( + collection_name, + payload_schema = type(data_points[0].payload), + ) - if not await self.has_collection(collection_name): - await self.create_collection( - collection_name, - payload_schema = type(data_points[0].payload), - ) - - collection = await connection.open_table(collection_name) + collection = await connection.open_table(collection_name) - data_vectors = await self.embed_data( - [data_point.get_embeddable_data() for data_point in data_points] - ) + data_vectors = await self.embed_data( + [data_point.get_embeddable_data() for data_point in data_points] + ) - IdType = TypeVar("IdType") - PayloadSchema = TypeVar("PayloadSchema") - vector_size = self.embedding_engine.get_vector_size() + IdType = TypeVar("IdType") + PayloadSchema = TypeVar("PayloadSchema") + vector_size = self.embedding_engine.get_vector_size() - class PGVectorDataPoint(Base, Generic[IdType, PayloadSchema]): - id: Mapped[int] = mapped_column(IdType, primary_key=True) - vector = mapped_column(Vector(vector_size)) - payload: mapped_column(PayloadSchema) + class PGVectorDataPoint(Base, Generic[IdType, PayloadSchema]): + id: Mapped[int] = mapped_column(IdType, primary_key=True) + vector = mapped_column(Vector(vector_size)) + payload: mapped_column(PayloadSchema) - pgvector_data_points = [ - PGVectorDataPoint[type(data_point.id), type(data_point.payload)]( - id = data_point.id, - vector = data_vectors[data_index], - payload = data_point.payload, - ) for (data_index, data_point) in enumerate(data_points) - ] + pgvector_data_points = [ + PGVectorDataPoint[type(data_point.id), type(data_point.payload)]( + id = data_point.id, + vector = data_vectors[data_index], + payload = data_point.payload, + ) for (data_index, data_point) in enumerate(data_points) + ] - await collection.add(pgvector_data_points) + await collection.add(pgvector_data_points) async def retrieve(self, collection_name: str, data_point_ids: list[str]): - connection = await self.get_async_session() - collection = await connection.open_table(collection_name) + async with self.engine.begin() as connection: + collection = await connection.open_table(collection_name) - if len(data_point_ids) == 1: - results = await collection.query().where(f"id = '{data_point_ids[0]}'").to_pandas() - else: - results = await collection.query().where(f"id IN {tuple(data_point_ids)}").to_pandas() + if len(data_point_ids) == 1: + results = await collection.query().where(f"id = '{data_point_ids[0]}'").to_pandas() + else: + results = await collection.query().where(f"id IN {tuple(data_point_ids)}").to_pandas() - return [ScoredResult( - id = result["id"], - payload = result["payload"], - score = 0, - ) for result in results.to_dict("index").values()] + return [ScoredResult( + id = result["id"], + payload = result["payload"], + score = 0, + ) for result in results.to_dict("index").values()] async def search( self, @@ -116,30 +120,30 @@ async def search( if query_text and not query_vector: query_vector = (await self.embedding_engine.embed_text([query_text]))[0] - connection = await self.get_async_session() - collection = await connection.open_table(collection_name) + async with self.engine.begin() as connection: + collection = await connection.open_table(collection_name) - results = await collection.vector_search(query_vector).limit(limit).to_pandas() + results = await collection.vector_search(query_vector).limit(limit).to_pandas() - result_values = list(results.to_dict("index").values()) + result_values = list(results.to_dict("index").values()) - min_value = 100 - max_value = 0 + min_value = 100 + max_value = 0 - for result in result_values: - value = float(result["_distance"]) - if value > max_value: - max_value = value - if value < min_value: - min_value = value + for result in result_values: + value = float(result["_distance"]) + if value > max_value: + max_value = value + if value < min_value: + min_value = value - normalized_values = [(result["_distance"] - min_value) / (max_value - min_value) for result in result_values] + normalized_values = [(result["_distance"] - min_value) / (max_value - min_value) for result in result_values] - return [ScoredResult( - id = str(result["id"]), - payload = result["payload"], - score = normalized_values[value_index], - ) for value_index, result in enumerate(result_values)] + return [ScoredResult( + id = str(result["id"]), + payload = result["payload"], + score = normalized_values[value_index], + ) for value_index, result in enumerate(result_values)] async def batch_search( self, @@ -160,10 +164,10 @@ async def batch_search( ) async def delete_data_points(self, collection_name: str, data_point_ids: list[str]): - connection = await self.get_async_session() - collection = await connection.open_table(collection_name) - results = await collection.delete(f"id IN {tuple(data_point_ids)}") - return results + async with self.engine.begin() as connection: + collection = await connection.open_table(collection_name) + results = await collection.delete(f"id IN {tuple(data_point_ids)}") + return results async def prune(self): # Clean up the database if it was set up as temporary diff --git a/cognee/infrastructure/databases/vector/pgvector/__init__.py b/cognee/infrastructure/databases/vector/pgvector/__init__.py index e69de29b..84dc8911 100644 --- a/cognee/infrastructure/databases/vector/pgvector/__init__.py +++ b/cognee/infrastructure/databases/vector/pgvector/__init__.py @@ -0,0 +1 @@ +from .PGVectorAdapter import PGVectorAdapter \ No newline at end of file From 9fbf2d857fab4af8f09b651ff5b71b02949f0cbd Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 17 Oct 2024 17:05:38 +0200 Subject: [PATCH 04/32] feat: Add PGVector support Added first working iteration of PGVector for cognee, some important funcionality is still missing, but the core is there. Also some refactoring will be necessary. Feature: #COG-170 --- cognee/api/v1/add/add.py | 5 +- cognee/api/v1/add/add_v2.py | 6 +- .../databases/vector/__init__.py | 1 + .../databases/vector/create_db_and_tables.py | 15 ++ .../databases/vector/create_vector_engine.py | 3 +- .../vector/pgvector/PGVectorAdapter.py | 225 ++++++++++-------- 6 files changed, 154 insertions(+), 101 deletions(-) create mode 100644 cognee/infrastructure/databases/vector/create_db_and_tables.py diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 85f0688a..b0b2fc8c 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -14,9 +14,12 @@ from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.modules.users.models import User from cognee.modules.data.methods import create_dataset +from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables +from cognee.infrastructure.databases.vector import create_db_and_tables as create_vector_db_and_tables async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_name: str = "main_dataset", user: User = None): - await create_db_and_tables() + await create_relational_db_and_tables() + await create_vector_db_and_tables() if isinstance(data, str): if "data://" in data: diff --git a/cognee/api/v1/add/add_v2.py b/cognee/api/v1/add/add_v2.py index 291ec5f4..f32f470a 100644 --- a/cognee/api/v1/add/add_v2.py +++ b/cognee/api/v1/add/add_v2.py @@ -3,10 +3,12 @@ from cognee.modules.users.methods import get_default_user from cognee.modules.pipelines import run_tasks, Task from cognee.tasks.ingestion import save_data_to_storage, ingest_data -from cognee.infrastructure.databases.relational import create_db_and_tables +from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables +from cognee.infrastructure.databases.vector import create_db_and_tables as create_vector_db_and_tables async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_name: str = "main_dataset", user: User = None): - await create_db_and_tables() + await create_relational_db_and_tables() + await create_vector_db_and_tables() if user is None: user = await get_default_user() diff --git a/cognee/infrastructure/databases/vector/__init__.py b/cognee/infrastructure/databases/vector/__init__.py index 604170f1..02d13bb9 100644 --- a/cognee/infrastructure/databases/vector/__init__.py +++ b/cognee/infrastructure/databases/vector/__init__.py @@ -4,3 +4,4 @@ from .vector_db_interface import VectorDBInterface from .config import get_vectordb_config from .get_vector_engine import get_vector_engine +from .create_db_and_tables import create_db_and_tables diff --git a/cognee/infrastructure/databases/vector/create_db_and_tables.py b/cognee/infrastructure/databases/vector/create_db_and_tables.py new file mode 100644 index 00000000..21522db6 --- /dev/null +++ b/cognee/infrastructure/databases/vector/create_db_and_tables.py @@ -0,0 +1,15 @@ +from ..relational.ModelBase import Base +from .get_vector_engine import get_vector_engine, get_vectordb_config +from sqlalchemy import text + +async def create_db_and_tables(): + vector_config = get_vectordb_config() + vector_engine = get_vector_engine() + + if vector_config.vector_engine_provider == "pgvector": + async with vector_engine.engine.begin() as connection: + if len(Base.metadata.tables.keys()) > 0: + await connection.run_sync(Base.metadata.create_all) + await connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;")) + + diff --git a/cognee/infrastructure/databases/vector/create_vector_engine.py b/cognee/infrastructure/databases/vector/create_vector_engine.py index 2399eac0..58e6dbf0 100644 --- a/cognee/infrastructure/databases/vector/create_vector_engine.py +++ b/cognee/infrastructure/databases/vector/create_vector_engine.py @@ -42,7 +42,8 @@ def create_vector_engine(config: VectorConfig, embedding_engine): # Get name of vector database db_name = config["vector_db_name"] - connection_string = f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}" + connection_string: str = f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}" + return PGVectorAdapter(connection_string, config["vector_db_key"], embedding_engine diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index 8b79fb9d..b0a44bb8 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -1,27 +1,35 @@ -from typing import List, Optional, get_type_hints, Generic, TypeVar -import asyncio +from typing import List, Optional, get_type_hints, Any, Dict +from sqlalchemy import text, select +from sqlalchemy import JSON, Column, Table +from sqlalchemy.dialects.postgresql import ARRAY from ..models.ScoredResult import ScoredResult from ..vector_db_interface import VectorDBInterface, DataPoint +from sqlalchemy.orm import Mapped, mapped_column from ..embeddings.EmbeddingEngine import EmbeddingEngine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker - -from sqlalchemy.orm import DeclarativeBase, mapped_column from pgvector.sqlalchemy import Vector from ...relational.sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter - - -# Define the models -class Base(DeclarativeBase): - pass +from ...relational.ModelBase import Base + +from datetime import datetime + +# TODO: Find better location for function +def serialize_datetime(data): + """Recursively convert datetime objects in dictionaries/lists to ISO format.""" + if isinstance(data, dict): + return {key: serialize_datetime(value) for key, value in data.items()} + elif isinstance(data, list): + return [serialize_datetime(item) for item in data] + elif isinstance(data, datetime): + return data.isoformat() # Convert datetime to ISO 8601 string + else: + return data class PGVectorAdapter(SQLAlchemyAdapter, VectorDBInterface): - async def create_vector_extension(self): - async with self.get_async_session() as session: - await session.execute(text("CREATE EXTENSION IF NOT EXISTS vector")) - def __init__(self, connection_string: str, + def __init__(self, connection_string: str, api_key: Optional[str], embedding_engine: EmbeddingEngine ): @@ -29,121 +37,156 @@ def __init__(self, connection_string: str, self.embedding_engine = embedding_engine self.db_uri: str = connection_string - self.engine = create_async_engine(connection_string) + self.engine = create_async_engine(self.db_uri, echo=True) self.sessionmaker = async_sessionmaker(bind=self.engine, expire_on_commit=False) - self.create_vector_extension() async def embed_data(self, data: list[str]) -> list[list[float]]: return await self.embedding_engine.embed_text(data) async def has_collection(self, collection_name: str) -> bool: async with self.engine.begin() as connection: - collection_names = await connection.table_names() - return collection_name in collection_names + #TODO: Switch to using ORM instead of raw query + result = await connection.execute( + text("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';") + ) + tables = result.fetchall() + for table in tables: + if collection_name == table[0]: + return True + return False async def create_collection(self, collection_name: str, payload_schema = None): data_point_types = get_type_hints(DataPoint) vector_size = self.embedding_engine.get_vector_size() - class PGVectorDataPoint(Base): - id: Mapped[int] = mapped_column(data_point_types["id"], primary_key=True) - vector = mapped_column(Vector(vector_size)) - payload: mapped_column(payload_schema) - if not await self.has_collection(collection_name): + + class PGVectorDataPoint(Base): + __tablename__ = collection_name + __table_args__ = {'extend_existing': True} + # PGVector requires one column to be the primary key + primary_key: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + id: Mapped[data_point_types["id"]] + payload = Column(JSON) + vector = Column(Vector(vector_size)) + + def __init__(self, id, payload, vector): + self.id = id + self.payload = payload + self.vector = vector + async with self.engine.begin() as connection: - return await connection.create_table( - name = collection_name, - schema = PGVectorDataPoint, - exist_ok = True, - ) + if len(Base.metadata.tables.keys()) > 0: + await connection.run_sync(Base.metadata.create_all, tables=[PGVectorDataPoint.__table__]) async def create_data_points(self, collection_name: str, data_points: List[DataPoint]): - async with self.engine.begin() as connection: + async with self.get_async_session() as session: if not await self.has_collection(collection_name): await self.create_collection( - collection_name, + collection_name = collection_name, payload_schema = type(data_points[0].payload), ) - collection = await connection.open_table(collection_name) - data_vectors = await self.embed_data( [data_point.get_embeddable_data() for data_point in data_points] ) - IdType = TypeVar("IdType") - PayloadSchema = TypeVar("PayloadSchema") vector_size = self.embedding_engine.get_vector_size() - class PGVectorDataPoint(Base, Generic[IdType, PayloadSchema]): - id: Mapped[int] = mapped_column(IdType, primary_key=True) - vector = mapped_column(Vector(vector_size)) - payload: mapped_column(PayloadSchema) + class PGVectorDataPoint(Base): + __tablename__ = collection_name + __table_args__ = {'extend_existing': True} + # PGVector requires one column to be the primary key + primary_key: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + id: Mapped[type(data_points[0].id)] + payload = Column(JSON) + vector = Column(Vector(vector_size)) + + def __init__(self, id, payload, vector): + self.id = id + self.payload = payload + self.vector = vector pgvector_data_points = [ - PGVectorDataPoint[type(data_point.id), type(data_point.payload)]( + PGVectorDataPoint( id = data_point.id, vector = data_vectors[data_index], - payload = data_point.payload, + payload = serialize_datetime(data_point.payload.dict()) ) for (data_index, data_point) in enumerate(data_points) ] - await collection.add(pgvector_data_points) - - async def retrieve(self, collection_name: str, data_point_ids: list[str]): - async with self.engine.begin() as connection: - collection = await connection.open_table(collection_name) - - if len(data_point_ids) == 1: - results = await collection.query().where(f"id = '{data_point_ids[0]}'").to_pandas() - else: - results = await collection.query().where(f"id IN {tuple(data_point_ids)}").to_pandas() - - return [ScoredResult( - id = result["id"], - payload = result["payload"], - score = 0, - ) for result in results.to_dict("index").values()] + session.add_all(pgvector_data_points) + await session.commit() + + async def retrieve(self, collection_name: str, data_point_ids: List[str]): + async with AsyncSession(self.engine) as session: + try: + # Construct the SQL query + # TODO: Switch to using ORM instead of raw query + if len(data_point_ids) == 1: + query = text(f"SELECT * FROM {collection_name} WHERE id = :id") + result = await session.execute(query, {"id": data_point_ids[0]}) + else: + query = text(f"SELECT * FROM {collection_name} WHERE id = ANY(:ids)") + result = await session.execute(query, {"ids": data_point_ids}) + + # Fetch all rows + rows = result.fetchall() + + return [ + ScoredResult( + id=row["id"], + payload=row["payload"], + score=0 + ) + for row in rows + ] + except Exception as e: + print(f"Error retrieving data: {e}") + return [] async def search( self, collection_name: str, - query_text: str = None, - query_vector: List[float] = None, + query_text: Optional[str] = None, + query_vector: Optional[List[float]] = None, limit: int = 5, with_vector: bool = False, - ): + ) -> List[ScoredResult]: + # Validate inputs if query_text is None and query_vector is None: raise ValueError("One of query_text or query_vector must be provided!") + # Get the vector for query_text if provided if query_text and not query_vector: query_vector = (await self.embedding_engine.embed_text([query_text]))[0] - async with self.engine.begin() as connection: - collection = await connection.open_table(collection_name) - - results = await collection.vector_search(query_vector).limit(limit).to_pandas() - - result_values = list(results.to_dict("index").values()) - - min_value = 100 - max_value = 0 - - for result in result_values: - value = float(result["_distance"]) - if value > max_value: - max_value = value - if value < min_value: - min_value = value - - normalized_values = [(result["_distance"] - min_value) / (max_value - min_value) for result in result_values] - - return [ScoredResult( - id = str(result["id"]), - payload = result["payload"], - score = normalized_values[value_index], - ) for value_index, result in enumerate(result_values)] + # Use async session to connect to the database + async with self.get_async_session() as session: + try: + PGVectorDataPoint = Table(collection_name, Base.metadata, autoload_with=self.engine) + + closest_items = await session.execute(select(PGVectorDataPoint, PGVectorDataPoint.c.vector.cosine_distance(query_vector).label('similarity')).order_by(PGVectorDataPoint.c.vector.cosine_distance(query_vector)).limit(limit)) + + vector_list = [] + # Extract distances and find min/max for normalization + for vector in closest_items: + #TODO: Add normalization of similarity score + vector_list.append(vector) + + # Create and return ScoredResult objects + return [ + ScoredResult( + id=str(row.id), + payload=row.payload, + score=row.similarity + ) + for row in vector_list + ] + + except Exception as e: + print(f"Error during search: {e}") + return [] async def batch_search( self, @@ -152,23 +195,11 @@ async def batch_search( limit: int = None, with_vectors: bool = False, ): - query_vectors = await self.embedding_engine.embed_text(query_texts) - - return asyncio.gather( - *[self.search( - collection_name = collection_name, - query_vector = query_vector, - limit = limit, - with_vector = with_vectors, - ) for query_vector in query_vectors] - ) + pass async def delete_data_points(self, collection_name: str, data_point_ids: list[str]): - async with self.engine.begin() as connection: - collection = await connection.open_table(collection_name) - results = await collection.delete(f"id IN {tuple(data_point_ids)}") - return results + pass async def prune(self): # Clean up the database if it was set up as temporary - self.delete_database() + await self.delete_database() From 9b9ae6c8aaa6be3c663b95d28d4d4f9a6d8525c1 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 17 Oct 2024 17:13:40 +0200 Subject: [PATCH 05/32] refactor: Remove unused env parameter Remove VECTOR_DB_NAME env parameter as it's not needed Refactor #COG-170 --- .env.template | 16 ++++++++-------- cognee/infrastructure/databases/vector/config.py | 2 -- .../databases/vector/create_vector_engine.py | 7 ++----- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/.env.template b/.env.template index c6194a64..4e9e16e8 100644 --- a/.env.template +++ b/.env.template @@ -7,26 +7,26 @@ GRAPHISTRY_PASSWORD= SENTRY_REPORTING_URL= -GRAPH_DATABASE_PROVIDER="neo4j" # "neo4j" or "networkx" +# "neo4j" or "networkx" +GRAPH_DATABASE_PROVIDER="neo4j" # Not needed if using networkx GRAPH_DATABASE_URL= GRAPH_DATABASE_USERNAME= GRAPH_DATABASE_PASSWORD= -VECTOR_ENGINE_PROVIDER="qdrant" # or "qdrant", "pgvector", "weaviate" or "lancedb" +# "qdrant", "pgvector", "weaviate" or "lancedb" +VECTOR_ENGINE_PROVIDER="qdrant" # Not needed if using "lancedb" or "pgvector" VECTOR_DB_URL= VECTOR_DB_KEY= -# Needed if using "pgvector" -VECTOR_DB_NAME= -# Relational Database provider -DB_PROVIDER="sqlite" # "sqlite" or "postgres" +# Relational Database provider "sqlite" or "postgres" +DB_PROVIDER="sqlite" -# Relational Database name +# Database name DB_NAME=cognee_db -# Postgres specific parameters (Only if Postgres or PGVector is run) +# Postgres specific parameters (Only if Postgres or PGVector is used) DB_HOST=127.0.0.1 DB_PORT=5432 DB_USERNAME=cognee diff --git a/cognee/infrastructure/databases/vector/config.py b/cognee/infrastructure/databases/vector/config.py index 5e403c92..8137a067 100644 --- a/cognee/infrastructure/databases/vector/config.py +++ b/cognee/infrastructure/databases/vector/config.py @@ -10,7 +10,6 @@ class VectorConfig(BaseSettings): ) vector_db_key: str = "" vector_engine_provider: str = "lancedb" - vector_db_name: str = "cognee_vector_db" model_config = SettingsConfigDict(env_file = ".env", extra = "allow") @@ -19,7 +18,6 @@ def to_dict(self) -> dict: "vector_db_url": self.vector_db_url, "vector_db_key": self.vector_db_key, "vector_db_provider": self.vector_engine_provider, - "vector_db_name": self.vector_db_name, } @lru_cache diff --git a/cognee/infrastructure/databases/vector/create_vector_engine.py b/cognee/infrastructure/databases/vector/create_vector_engine.py index 58e6dbf0..134a8e9f 100644 --- a/cognee/infrastructure/databases/vector/create_vector_engine.py +++ b/cognee/infrastructure/databases/vector/create_vector_engine.py @@ -6,7 +6,6 @@ class VectorConfig(Dict): vector_db_url: str vector_db_key: str vector_db_provider: str - vector_db_name: str def create_vector_engine(config: VectorConfig, embedding_engine): if config["vector_db_provider"] == "weaviate": @@ -38,10 +37,8 @@ def create_vector_engine(config: VectorConfig, embedding_engine): db_password = relational_config.db_password db_host = relational_config.db_host db_port = relational_config.db_port - - # Get name of vector database - db_name = config["vector_db_name"] - + db_name = relational_config.db_name + connection_string: str = f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}" return PGVectorAdapter(connection_string, From aa26eabdbb1ae6fb689b1886f1accc34edbd84c2 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 17 Oct 2024 17:26:26 +0200 Subject: [PATCH 06/32] refactor: Remove echo for database Removed echoing of database operations Refactor #COG-170 --- .../infrastructure/databases/vector/pgvector/PGVectorAdapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index b0a44bb8..2b21f4d0 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -37,7 +37,7 @@ def __init__(self, connection_string: str, self.embedding_engine = embedding_engine self.db_uri: str = connection_string - self.engine = create_async_engine(self.db_uri, echo=True) + self.engine = create_async_engine(self.db_uri) self.sessionmaker = async_sessionmaker(bind=self.engine, expire_on_commit=False) async def embed_data(self, data: list[str]) -> list[list[float]]: From 02cd2408d66ef7432533303a9678b625aa2a914e Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 17 Oct 2024 18:11:11 +0200 Subject: [PATCH 07/32] feat: Add batch search to PGVectorAdapter Added batch search to PGVectorAdapter Feature #COG-170 --- .../databases/vector/pgvector/PGVectorAdapter.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index 2b21f4d0..f77d1ee2 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -1,8 +1,8 @@ from typing import List, Optional, get_type_hints, Any, Dict from sqlalchemy import text, select from sqlalchemy import JSON, Column, Table -from sqlalchemy.dialects.postgresql import ARRAY from ..models.ScoredResult import ScoredResult +import asyncio from ..vector_db_interface import VectorDBInterface, DataPoint from sqlalchemy.orm import Mapped, mapped_column @@ -153,11 +153,9 @@ async def search( limit: int = 5, with_vector: bool = False, ) -> List[ScoredResult]: - # Validate inputs if query_text is None and query_vector is None: raise ValueError("One of query_text or query_vector must be provided!") - # Get the vector for query_text if provided if query_text and not query_vector: query_vector = (await self.embedding_engine.embed_text([query_text]))[0] @@ -195,7 +193,16 @@ async def batch_search( limit: int = None, with_vectors: bool = False, ): - pass + query_vectors = await self.embedding_engine.embed_text(query_texts) + + return asyncio.gather( + *[self.search( + collection_name = collection_name, + query_vector = query_vector, + limit = limit, + with_vector = with_vectors, + ) for query_vector in query_vectors] + ) async def delete_data_points(self, collection_name: str, data_point_ids: list[str]): pass From 325e6cd65465c2ed26acf0abc3e132327aa93f53 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 18 Oct 2024 13:33:40 +0200 Subject: [PATCH 08/32] refactor: Rewrite search query Optimize search query for PGVector Refactor #COG-170 --- .../infrastructure/databases/vector/pgvector/PGVectorAdapter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index f77d1ee2..c592a5b7 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -164,7 +164,7 @@ async def search( try: PGVectorDataPoint = Table(collection_name, Base.metadata, autoload_with=self.engine) - closest_items = await session.execute(select(PGVectorDataPoint, PGVectorDataPoint.c.vector.cosine_distance(query_vector).label('similarity')).order_by(PGVectorDataPoint.c.vector.cosine_distance(query_vector)).limit(limit)) + closest_items = await session.execute(select(PGVectorDataPoint, PGVectorDataPoint.c.vector.cosine_distance(query_vector).label('similarity')).order_by('similarity').limit(limit)) vector_list = [] # Extract distances and find min/max for normalization From 2cd255768e7d9ca225385ddb9446ef7c3b436522 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 18 Oct 2024 14:46:33 +0200 Subject: [PATCH 09/32] refactor: Add formatting to PGVector Adapter Formatted PGVectorAdapter Refactor #COG-170 --- .../vector/pgvector/PGVectorAdapter.py | 97 ++++++++++++------- 1 file changed, 61 insertions(+), 36 deletions(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index c592a5b7..9b8ef1c8 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -15,6 +15,7 @@ from datetime import datetime + # TODO: Find better location for function def serialize_datetime(data): """Recursively convert datetime objects in dictionaries/lists to ISO format.""" @@ -27,11 +28,14 @@ def serialize_datetime(data): else: return data + class PGVectorAdapter(SQLAlchemyAdapter, VectorDBInterface): - def __init__(self, connection_string: str, + def __init__( + self, + connection_string: str, api_key: Optional[str], - embedding_engine: EmbeddingEngine + embedding_engine: EmbeddingEngine, ): self.api_key = api_key self.embedding_engine = embedding_engine @@ -45,9 +49,11 @@ async def embed_data(self, data: list[str]) -> list[list[float]]: async def has_collection(self, collection_name: str) -> bool: async with self.engine.begin() as connection: - #TODO: Switch to using ORM instead of raw query + # TODO: Switch to using ORM instead of raw query result = await connection.execute( - text("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';") + text( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';" + ) ) tables = result.fetchall() for table in tables: @@ -55,17 +61,19 @@ async def has_collection(self, collection_name: str) -> bool: return True return False - async def create_collection(self, collection_name: str, payload_schema = None): + async def create_collection(self, collection_name: str, payload_schema=None): data_point_types = get_type_hints(DataPoint) vector_size = self.embedding_engine.get_vector_size() if not await self.has_collection(collection_name): - + class PGVectorDataPoint(Base): __tablename__ = collection_name - __table_args__ = {'extend_existing': True} + __table_args__ = {"extend_existing": True} # PGVector requires one column to be the primary key - primary_key: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + primary_key: Mapped[int] = mapped_column( + primary_key=True, autoincrement=True + ) id: Mapped[data_point_types["id"]] payload = Column(JSON) vector = Column(Vector(vector_size)) @@ -77,14 +85,18 @@ def __init__(self, id, payload, vector): async with self.engine.begin() as connection: if len(Base.metadata.tables.keys()) > 0: - await connection.run_sync(Base.metadata.create_all, tables=[PGVectorDataPoint.__table__]) + await connection.run_sync( + Base.metadata.create_all, tables=[PGVectorDataPoint.__table__] + ) - async def create_data_points(self, collection_name: str, data_points: List[DataPoint]): + async def create_data_points( + self, collection_name: str, data_points: List[DataPoint] + ): async with self.get_async_session() as session: if not await self.has_collection(collection_name): await self.create_collection( - collection_name = collection_name, - payload_schema = type(data_points[0].payload), + collection_name=collection_name, + payload_schema=type(data_points[0].payload), ) data_vectors = await self.embed_data( @@ -95,9 +107,11 @@ async def create_data_points(self, collection_name: str, data_points: List[DataP class PGVectorDataPoint(Base): __tablename__ = collection_name - __table_args__ = {'extend_existing': True} + __table_args__ = {"extend_existing": True} # PGVector requires one column to be the primary key - primary_key: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + primary_key: Mapped[int] = mapped_column( + primary_key=True, autoincrement=True + ) id: Mapped[type(data_points[0].id)] payload = Column(JSON) vector = Column(Vector(vector_size)) @@ -109,10 +123,11 @@ def __init__(self, id, payload, vector): pgvector_data_points = [ PGVectorDataPoint( - id = data_point.id, - vector = data_vectors[data_index], - payload = serialize_datetime(data_point.payload.dict()) - ) for (data_index, data_point) in enumerate(data_points) + id=data_point.id, + vector=data_vectors[data_index], + payload=serialize_datetime(data_point.payload.dict()), + ) + for (data_index, data_point) in enumerate(data_points) ] session.add_all(pgvector_data_points) @@ -127,18 +142,16 @@ async def retrieve(self, collection_name: str, data_point_ids: List[str]): query = text(f"SELECT * FROM {collection_name} WHERE id = :id") result = await session.execute(query, {"id": data_point_ids[0]}) else: - query = text(f"SELECT * FROM {collection_name} WHERE id = ANY(:ids)") + query = text( + f"SELECT * FROM {collection_name} WHERE id = ANY(:ids)" + ) result = await session.execute(query, {"ids": data_point_ids}) # Fetch all rows rows = result.fetchall() return [ - ScoredResult( - id=row["id"], - payload=row["payload"], - score=0 - ) + ScoredResult(id=row["id"], payload=row["payload"], score=0) for row in rows ] except Exception as e: @@ -162,22 +175,31 @@ async def search( # Use async session to connect to the database async with self.get_async_session() as session: try: - PGVectorDataPoint = Table(collection_name, Base.metadata, autoload_with=self.engine) + PGVectorDataPoint = Table( + collection_name, Base.metadata, autoload_with=self.engine + ) - closest_items = await session.execute(select(PGVectorDataPoint, PGVectorDataPoint.c.vector.cosine_distance(query_vector).label('similarity')).order_by('similarity').limit(limit)) + closest_items = await session.execute( + select( + PGVectorDataPoint, + PGVectorDataPoint.c.vector.cosine_distance(query_vector).label( + "similarity" + ), + ) + .order_by("similarity") + .limit(limit) + ) vector_list = [] # Extract distances and find min/max for normalization for vector in closest_items: - #TODO: Add normalization of similarity score + # TODO: Add normalization of similarity score vector_list.append(vector) # Create and return ScoredResult objects return [ ScoredResult( - id=str(row.id), - payload=row.payload, - score=row.similarity + id=str(row.id), payload=row.payload, score=row.similarity ) for row in vector_list ] @@ -196,12 +218,15 @@ async def batch_search( query_vectors = await self.embedding_engine.embed_text(query_texts) return asyncio.gather( - *[self.search( - collection_name = collection_name, - query_vector = query_vector, - limit = limit, - with_vector = with_vectors, - ) for query_vector in query_vectors] + *[ + self.search( + collection_name=collection_name, + query_vector=query_vector, + limit=limit, + with_vector=with_vectors, + ) + for query_vector in query_vectors + ] ) async def delete_data_points(self, collection_name: str, data_point_ids: list[str]): From 7f7b015ea3cf3e9144aed27a4f65799255d951d8 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 18 Oct 2024 15:00:51 +0200 Subject: [PATCH 10/32] refactor: Add formatting to create_vector_engine Added formatting for PGVector part of create_vector_engine Refactor #COG-170 --- .../infrastructure/databases/vector/create_vector_engine.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cognee/infrastructure/databases/vector/create_vector_engine.py b/cognee/infrastructure/databases/vector/create_vector_engine.py index 134a8e9f..b1d18575 100644 --- a/cognee/infrastructure/databases/vector/create_vector_engine.py +++ b/cognee/infrastructure/databases/vector/create_vector_engine.py @@ -38,8 +38,10 @@ def create_vector_engine(config: VectorConfig, embedding_engine): db_host = relational_config.db_host db_port = relational_config.db_port db_name = relational_config.db_name - - connection_string: str = f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}" + + connection_string: str = ( + f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}" + ) return PGVectorAdapter(connection_string, config["vector_db_key"], From d2772d22b871b509161f1e0df1d95e69de414d19 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 18 Oct 2024 15:03:28 +0200 Subject: [PATCH 11/32] refactor: Formatting change for create_vector_engine Change formatting for connection string for PGVectorAdapter Refactor #COG-170 --- cognee/infrastructure/databases/vector/create_vector_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/infrastructure/databases/vector/create_vector_engine.py b/cognee/infrastructure/databases/vector/create_vector_engine.py index b1d18575..f0cbfcd5 100644 --- a/cognee/infrastructure/databases/vector/create_vector_engine.py +++ b/cognee/infrastructure/databases/vector/create_vector_engine.py @@ -41,7 +41,7 @@ def create_vector_engine(config: VectorConfig, embedding_engine): connection_string: str = ( f"postgresql+asyncpg://{db_username}:{db_password}@{db_host}:{db_port}/{db_name}" - ) + ) return PGVectorAdapter(connection_string, config["vector_db_key"], From 240c660eac3a294c0d2cf235e6934de2ab145f3d Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 21 Oct 2024 12:59:24 +0200 Subject: [PATCH 12/32] refactor: Change raw SQL queries to SQLalchemy ORM for PGVectorAdapter Changed raw SQL quries to use SQLalchemy ORM for PGVectorAdapter Refactor #COG-170 --- .../vector/pgvector/PGVectorAdapter.py | 85 ++++++++++--------- 1 file changed, 45 insertions(+), 40 deletions(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index 9b8ef1c8..b9a71666 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -1,13 +1,13 @@ -from typing import List, Optional, get_type_hints, Any, Dict -from sqlalchemy import text, select -from sqlalchemy import JSON, Column, Table +from typing import List, Optional, get_type_hints +from sqlalchemy import JSON, Column, Table, select, delete + from ..models.ScoredResult import ScoredResult import asyncio from ..vector_db_interface import VectorDBInterface, DataPoint from sqlalchemy.orm import Mapped, mapped_column from ..embeddings.EmbeddingEngine import EmbeddingEngine -from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker from pgvector.sqlalchemy import Vector from ...relational.sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter @@ -49,17 +49,13 @@ async def embed_data(self, data: list[str]) -> list[list[float]]: async def has_collection(self, collection_name: str) -> bool: async with self.engine.begin() as connection: - # TODO: Switch to using ORM instead of raw query - result = await connection.execute( - text( - "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public';" - ) - ) - tables = result.fetchall() - for table in tables: - if collection_name == table[0]: - return True - return False + # Load the schema information into the MetaData object + await connection.run_sync(Base.metadata.reflect) + + if collection_name in Base.metadata.tables: + return True + else: + return False async def create_collection(self, collection_name: str, payload_schema=None): data_point_types = get_type_hints(DataPoint) @@ -133,30 +129,32 @@ def __init__(self, id, payload, vector): session.add_all(pgvector_data_points) await session.commit() + async def get_table(self, collection_name: str) -> Table: + """ + Dynamically loads a table using the given collection name + with an async engine. + """ + async with self.engine.begin() as connection: + await connection.run_sync(Base.metadata.reflect) # Reflect the metadata + if collection_name in Base.metadata.tables: + return Base.metadata.tables[collection_name] + else: + raise ValueError(f"Table '{collection_name}' not found.") + async def retrieve(self, collection_name: str, data_point_ids: List[str]): - async with AsyncSession(self.engine) as session: - try: - # Construct the SQL query - # TODO: Switch to using ORM instead of raw query - if len(data_point_ids) == 1: - query = text(f"SELECT * FROM {collection_name} WHERE id = :id") - result = await session.execute(query, {"id": data_point_ids[0]}) - else: - query = text( - f"SELECT * FROM {collection_name} WHERE id = ANY(:ids)" - ) - result = await session.execute(query, {"ids": data_point_ids}) + async with self.get_async_session() as session: + # Get PGVectorDataPoint Table from database + PGVectorDataPoint = await self.get_table(collection_name) - # Fetch all rows - rows = result.fetchall() + results = await session.execute( + select(PGVectorDataPoint).where(PGVectorDataPoint.c.id.in_(data_point_ids)) + ) + results = results.all() - return [ - ScoredResult(id=row["id"], payload=row["payload"], score=0) - for row in rows - ] - except Exception as e: - print(f"Error retrieving data: {e}") - return [] + return [ + ScoredResult(id=result.id, payload=result.payload, score=0) + for result in results + ] async def search( self, @@ -175,10 +173,10 @@ async def search( # Use async session to connect to the database async with self.get_async_session() as session: try: - PGVectorDataPoint = Table( - collection_name, Base.metadata, autoload_with=self.engine - ) + # Get PGVectorDataPoint Table from database + PGVectorDataPoint = await self.get_table(collection_name) + # Find closest vectors to query_vector closest_items = await session.execute( select( PGVectorDataPoint, @@ -230,7 +228,14 @@ async def batch_search( ) async def delete_data_points(self, collection_name: str, data_point_ids: list[str]): - pass + async with self.get_async_session() as session: + # Get PGVectorDataPoint Table from database + PGVectorDataPoint = await self.get_table(collection_name) + results = await session.execute( + delete(PGVectorDataPoint).where(PGVectorDataPoint.c.id.in_(data_point_ids)) + ) + await session.commit() + return results async def prune(self): # Clean up the database if it was set up as temporary From 05e4ef349e82fc385ac53fab663e72a697e29cf9 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 21 Oct 2024 14:28:57 +0200 Subject: [PATCH 13/32] fix: Fix pruning of postgres database Fixed issue with newly added tables not being pruned from postgres database Fix #COG-170 --- cognee/api/v1/config/config.py | 35 +++++++++++++++++++ .../sqlalchemy/SqlAlchemyAdapter.py | 2 ++ .../vector/pgvector/PGVectorAdapter.py | 3 +- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/cognee/api/v1/config/config.py b/cognee/api/v1/config/config.py index 7d2c7c6f..d5acfd08 100644 --- a/cognee/api/v1/config/config.py +++ b/cognee/api/v1/config/config.py @@ -39,6 +39,41 @@ def set_classification_model(classification_model: object): cognify_config = get_cognify_config() cognify_config.classification_model = classification_model + @staticmethod + def set_db_name(db_name: str): + cognify_config = get_relational_config() + cognify_config.db_name = db_name + + @staticmethod + def set_db_path(db_path: str): + cognify_config = get_relational_config() + cognify_config.db_path = db_path + + @staticmethod + def set_db_host(db_host: str): + cognify_config = get_relational_config() + cognify_config.db_host = db_host + + @staticmethod + def set_db_port(db_port: str): + cognify_config = get_relational_config() + cognify_config.db_port = db_port + + @staticmethod + def set_db_username(db_username: str): + cognify_config = get_relational_config() + cognify_config.db_username = db_username + + @staticmethod + def set_db_password(db_password: str): + cognify_config = get_relational_config() + cognify_config.db_password = db_password + + @staticmethod + def set_db_provider(db_provider: str): + cognify_config = get_relational_config() + cognify_config.db_provider = db_provider + @staticmethod def set_summarization_model(summarization_model: object): cognify_config = get_cognify_config() diff --git a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py index 36302bce..81a828bd 100644 --- a/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py +++ b/cognee/infrastructure/databases/relational/sqlalchemy/SqlAlchemyAdapter.py @@ -119,6 +119,8 @@ async def delete_database(self): self.db_path = None else: async with self.engine.begin() as connection: + # Load the schema information into the MetaData object + await connection.run_sync(Base.metadata.reflect) for table in Base.metadata.sorted_tables: drop_table_query = text(f"DROP TABLE IF EXISTS {table.name} CASCADE") await connection.execute(drop_table_query) diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index b9a71666..8b873b2f 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -135,7 +135,8 @@ async def get_table(self, collection_name: str) -> Table: with an async engine. """ async with self.engine.begin() as connection: - await connection.run_sync(Base.metadata.reflect) # Reflect the metadata + # Load the schema information into the MetaData object + await connection.run_sync(Base.metadata.reflect) if collection_name in Base.metadata.tables: return Base.metadata.tables[collection_name] else: From f8babbaa5343893bd4d6b9ce5c9dfe938ecc9392 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 21 Oct 2024 14:33:00 +0200 Subject: [PATCH 14/32] test: Add test for PGVectorAdapter Added test for PGVectorAdapter Test #COG-170 --- cognee/tests/test_pgvector.py | 72 +++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 cognee/tests/test_pgvector.py diff --git a/cognee/tests/test_pgvector.py b/cognee/tests/test_pgvector.py new file mode 100644 index 00000000..8d839ab5 --- /dev/null +++ b/cognee/tests/test_pgvector.py @@ -0,0 +1,72 @@ + + +import os +import logging +import pathlib +import cognee +from cognee.api.v1.search import SearchType + +logging.basicConfig(level=logging.DEBUG) + +async def main(): + #TODO: Should this be set on pipeline side or in the test? + cognee.config.set_vector_engine_provider("pgvector") + cognee.config.set_db_provider("postgres") + cognee.config.set_db_name("cognee_db") + cognee.config.set_db_host("127.0.0.1") + cognee.config.set_db_port("5432") + cognee.config.set_db_username("cognee") + cognee.config.set_db_password("cognee") + + data_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_pgvector")).resolve()) + cognee.config.data_root_directory(data_directory_path) + cognee_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_pgvector")).resolve()) + cognee.config.system_root_directory(cognee_directory_path) + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata = True) + + dataset_name = "cs_explanations" + + explanation_file_path = os.path.join(pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt") + await cognee.add([explanation_file_path], dataset_name) + + text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena. + At small scales, physical matter exhibits properties of both particles and waves, and quantum computing leverages this behavior, specifically quantum superposition and entanglement, using specialized hardware that supports the preparation and manipulation of quantum states. + Classical physics cannot explain the operation of these quantum devices, and a scalable quantum computer could perform some calculations exponentially faster (with respect to input size scaling) than any modern "classical" computer. In particular, a large-scale quantum computer could break widely used encryption schemes and aid physicists in performing physical simulations; however, the current state of the technology is largely experimental and impractical, with several obstacles to useful applications. Moreover, scalable quantum computers do not hold promise for many practical tasks, and for many important tasks quantum speedups are proven impossible. + The basic unit of information in quantum computing is the qubit, similar to the bit in traditional digital electronics. Unlike a classical bit, a qubit can exist in a superposition of its two "basis" states. When measuring a qubit, the result is a probabilistic output of a classical bit, therefore making quantum computers nondeterministic in general. If a quantum computer manipulates the qubit in a particular way, wave interference effects can amplify the desired measurement results. The design of quantum algorithms involves creating procedures that allow a quantum computer to perform calculations efficiently and quickly. + Physically engineering high-quality qubits has proven challenging. If a physical qubit is not sufficiently isolated from its environment, it suffers from quantum decoherence, introducing noise into calculations. Paradoxically, perfectly isolating qubits is also undesirable because quantum computations typically need to initialize qubits, perform controlled qubit interactions, and measure the resulting quantum states. Each of those operations introduces errors and suffers from noise, and such inaccuracies accumulate. + In principle, a non-quantum (classical) computer can solve the same computational problems as a quantum computer, given enough time. Quantum advantage comes in the form of time complexity rather than computability, and quantum complexity theory shows that some quantum algorithms for carefully selected tasks require exponentially fewer computational steps than the best known non-quantum algorithms. Such tasks can in theory be solved on a large-scale quantum computer whereas classical computers would not finish computations in any reasonable amount of time. However, quantum speedup is not universal or even typical across computational tasks, since basic tasks such as sorting are proven to not allow any asymptotic quantum speedup. Claims of quantum supremacy have drawn significant attention to the discipline, but are demonstrated on contrived tasks, while near-term practical use cases remain limited. + """ + + await cognee.add([text], dataset_name) + + await cognee.cognify([dataset_name]) + + from cognee.infrastructure.databases.vector import get_vector_engine + vector_engine = get_vector_engine() + random_node = (await vector_engine.search("entities", "AI"))[0] + random_node_name = random_node.payload["name"] + + search_results = await cognee.search(SearchType.INSIGHTS, query = random_node_name) + assert len(search_results) != 0, "The search results list is empty." + print("\n\nExtracted sentences are:\n") + for result in search_results: + print(f"{result}\n") + + search_results = await cognee.search(SearchType.CHUNKS, query = random_node_name) + assert len(search_results) != 0, "The search results list is empty." + print("\n\nExtracted chunks are:\n") + for result in search_results: + print(f"{result}\n") + + search_results = await cognee.search(SearchType.SUMMARIES, query = random_node_name) + assert len(search_results) != 0, "Query related summaries don't exist." + print("\n\Extracted summaries are:\n") + for result in search_results: + print(f"{result}\n") + + +if __name__ == "__main__": + import asyncio + asyncio.run(main()) From 9f4b8f2f42ed1273434e0243611cb1ed0a01c28b Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 21 Oct 2024 14:50:39 +0200 Subject: [PATCH 15/32] test: Add github action workflow to run PGVectorAdapter integration test Added github action workflow to run PGVectorAdapter integration test Test #COG-170 --- .github/workflows/test_pgvector.yml | 66 +++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 .github/workflows/test_pgvector.yml diff --git a/.github/workflows/test_pgvector.yml b/.github/workflows/test_pgvector.yml new file mode 100644 index 00000000..baed4629 --- /dev/null +++ b/.github/workflows/test_pgvector.yml @@ -0,0 +1,66 @@ +name: test | pgvector + +on: + pull_request: + branches: + - main + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} + cancel-in-progress: true + +env: + RUNTIME__LOG_LEVEL: ERROR + +jobs: + get_docs_changes: + name: docs changes + uses: ./.github/workflows/get_docs_changes.yml + + run_pgvector_integration_test: + name: test + needs: get_docs_changes + if: needs.get_docs_changes.outputs.changes_outside_docs == 'true' + runs-on: ubuntu-latest + defaults: + run: + shell: bash + services: + postgres: + image: pgvector/pgvector:pg17 + env: + POSTGRES_USER: cognee + POSTGRES_PASSWORD: cognee + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Check out + uses: actions/checkout@master + + - name: Setup Python + uses: actions/setup-python@v5 + with: + python-version: '3.11.x' + + - name: Install Poetry + uses: snok/install-poetry@v1.3.2 + with: + virtualenvs-create: true + virtualenvs-in-project: true + installer-parallel: true + + - name: Install dependencies + run: poetry install --no-interaction + + - name: Run default PGVector + env: + ENV: 'dev' + LLM_API_KEY: ${{ secrets.OPENAI_API_KEY }} + run: poetry run python ./cognee/tests/test_pgvector.py From 4c381a3ae904514f6bd5da29a09b22ee922714b5 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 21 Oct 2024 15:08:27 +0200 Subject: [PATCH 16/32] chore: Add pgvector dependency Added pgvector dependency for PGVectorAdapter Chore #COG-170 --- cognee/tests/test_pgvector.py | 2 +- poetry.lock | 22 +++++++++++++++++++++- pyproject.toml | 1 + 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/cognee/tests/test_pgvector.py b/cognee/tests/test_pgvector.py index 8d839ab5..b90c08a7 100644 --- a/cognee/tests/test_pgvector.py +++ b/cognee/tests/test_pgvector.py @@ -12,7 +12,7 @@ async def main(): #TODO: Should this be set on pipeline side or in the test? cognee.config.set_vector_engine_provider("pgvector") cognee.config.set_db_provider("postgres") - cognee.config.set_db_name("cognee_db") + cognee.config.set_db_name("postgres") cognee.config.set_db_host("127.0.0.1") cognee.config.set_db_port("5432") cognee.config.set_db_username("cognee") diff --git a/poetry.lock b/poetry.lock index a8df0414..e897ca39 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3685,6 +3685,7 @@ optional = false python-versions = ">=3.6" files = [ {file = "mkdocs-redirects-1.2.1.tar.gz", hash = "sha256:9420066d70e2a6bb357adf86e67023dcdca1857f97f07c7fe450f8f1fb42f861"}, + {file = "mkdocs_redirects-1.2.1-py3-none-any.whl", hash = "sha256:497089f9e0219e7389304cffefccdfa1cac5ff9509f2cb706f4c9b221726dffb"}, ] [package.dependencies] @@ -4655,6 +4656,20 @@ files = [ [package.dependencies] ptyprocess = ">=0.5" +[[package]] +name = "pgvector" +version = "0.3.5" +description = "pgvector support for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "pgvector-0.3.5-py3-none-any.whl", hash = "sha256:56cca90392e596ea18873c593ec858a1984a77d16d1f82b8d0c180e79ef1018f"}, + {file = "pgvector-0.3.5.tar.gz", hash = "sha256:e876c9ee382c4c2f7ee57691a4c4015d688c7222e47448ce310ded03ecfafe2f"}, +] + +[package.dependencies] +numpy = "*" + [[package]] name = "pillow" version = "10.4.0" @@ -6259,6 +6274,11 @@ files = [ {file = "scikit_learn-1.5.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f60021ec1574e56632be2a36b946f8143bf4e5e6af4a06d85281adc22938e0dd"}, {file = "scikit_learn-1.5.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:394397841449853c2290a32050382edaec3da89e35b3e03d6cc966aebc6a8ae6"}, {file = "scikit_learn-1.5.2-cp312-cp312-win_amd64.whl", hash = "sha256:57cc1786cfd6bd118220a92ede80270132aa353647684efa385a74244a41e3b1"}, + {file = "scikit_learn-1.5.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:e9a702e2de732bbb20d3bad29ebd77fc05a6b427dc49964300340e4c9328b3f5"}, + {file = "scikit_learn-1.5.2-cp313-cp313-macosx_12_0_arm64.whl", hash = "sha256:b0768ad641981f5d3a198430a1d31c3e044ed2e8a6f22166b4d546a5116d7908"}, + {file = "scikit_learn-1.5.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:178ddd0a5cb0044464fc1bfc4cca5b1833bfc7bb022d70b05db8530da4bb3dd3"}, + {file = "scikit_learn-1.5.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f7284ade780084d94505632241bf78c44ab3b6f1e8ccab3d2af58e0e950f9c12"}, + {file = "scikit_learn-1.5.2-cp313-cp313-win_amd64.whl", hash = "sha256:b7b0f9a0b1040830d38c39b91b3a44e1b643f4b36e36567b80b7c6bd2202a27f"}, {file = "scikit_learn-1.5.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:757c7d514ddb00ae249832fe87100d9c73c6ea91423802872d9e74970a0e40b9"}, {file = "scikit_learn-1.5.2-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:52788f48b5d8bca5c0736c175fa6bdaab2ef00a8f536cda698db61bd89c551c1"}, {file = "scikit_learn-1.5.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:643964678f4b5fbdc95cbf8aec638acc7aa70f5f79ee2cdad1eec3df4ba6ead8"}, @@ -7727,4 +7747,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.9.0,<3.12" -content-hash = "75d65fd1b99bf9db84fe026d140f6cb05b02afd31d4ad82a6286076256bd7609" +content-hash = "2584d5dccf52576f22234cd5b075a11f3aa4feb8aa968b0e174f70a75824664e" diff --git a/pyproject.toml b/pyproject.toml index 0199bb1f..69541f9c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ sentry-sdk = {extras = ["fastapi"], version = "^2.9.0"} fastapi-users = { version = "*", extras = ["sqlalchemy"] } asyncpg = "^0.29.0" alembic = "^1.13.3" +pgvector = "^0.3.5" From 9461ba0fd1425f1d157272c420d2aa9958305b77 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 21 Oct 2024 15:32:51 +0200 Subject: [PATCH 17/32] chore: Add psycopg2 dependency Add psycopg2 a postgresql database adapter dependency Chore #COG-170 --- poetry.lock | 20 +++++++++++++++++++- pyproject.toml | 1 + 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/poetry.lock b/poetry.lock index e897ca39..fc2f8d5d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4928,6 +4928,24 @@ files = [ [package.extras] test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] +[[package]] +name = "psycopg2" +version = "2.9.10" +description = "psycopg2 - Python-PostgreSQL Database Adapter" +optional = false +python-versions = ">=3.8" +files = [ + {file = "psycopg2-2.9.10-cp310-cp310-win32.whl", hash = "sha256:5df2b672140f95adb453af93a7d669d7a7bf0a56bcd26f1502329166f4a61716"}, + {file = "psycopg2-2.9.10-cp310-cp310-win_amd64.whl", hash = "sha256:c6f7b8561225f9e711a9c47087388a97fdc948211c10a4bccbf0ba68ab7b3b5a"}, + {file = "psycopg2-2.9.10-cp311-cp311-win32.whl", hash = "sha256:47c4f9875125344f4c2b870e41b6aad585901318068acd01de93f3677a6522c2"}, + {file = "psycopg2-2.9.10-cp311-cp311-win_amd64.whl", hash = "sha256:0435034157049f6846e95103bd8f5a668788dd913a7c30162ca9503fdf542cb4"}, + {file = "psycopg2-2.9.10-cp312-cp312-win32.whl", hash = "sha256:65a63d7ab0e067e2cdb3cf266de39663203d38d6a8ed97f5ca0cb315c73fe067"}, + {file = "psycopg2-2.9.10-cp312-cp312-win_amd64.whl", hash = "sha256:4a579d6243da40a7b3182e0430493dbd55950c493d8c68f4eec0b302f6bbf20e"}, + {file = "psycopg2-2.9.10-cp39-cp39-win32.whl", hash = "sha256:9d5b3b94b79a844a986d029eee38998232451119ad653aea42bb9220a8c5066b"}, + {file = "psycopg2-2.9.10-cp39-cp39-win_amd64.whl", hash = "sha256:88138c8dedcbfa96408023ea2b0c369eda40fe5d75002c0964c78f46f11fa442"}, + {file = "psycopg2-2.9.10.tar.gz", hash = "sha256:12ec0b40b0273f95296233e8750441339298e6a572f7039da5b260e3c8b60e11"}, +] + [[package]] name = "ptyprocess" version = "0.7.0" @@ -7747,4 +7765,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.9.0,<3.12" -content-hash = "2584d5dccf52576f22234cd5b075a11f3aa4feb8aa968b0e174f70a75824664e" +content-hash = "505c4f6f2128aea335ea6065221cf4692750e374f1b09667d250b6f77f3e4b59" diff --git a/pyproject.toml b/pyproject.toml index 69541f9c..898ad570 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ fastapi-users = { version = "*", extras = ["sqlalchemy"] } asyncpg = "^0.29.0" alembic = "^1.13.3" pgvector = "^0.3.5" +psycopg2 = "^2.9.10" From 2cedcbe6700208836f0a6ea8a1927c91eb4965cf Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 21 Oct 2024 15:51:48 +0200 Subject: [PATCH 18/32] refactor: Change database name in PGVectorAdapter test and workflow Changed database name in test and workflow to be the same as in the .env.template Refactor #COG-170 --- .github/workflows/test_pgvector.yml | 1 + cognee/tests/test_pgvector.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test_pgvector.yml b/.github/workflows/test_pgvector.yml index baed4629..401be800 100644 --- a/.github/workflows/test_pgvector.yml +++ b/.github/workflows/test_pgvector.yml @@ -32,6 +32,7 @@ jobs: env: POSTGRES_USER: cognee POSTGRES_PASSWORD: cognee + POSTGRES_DB: cognee_db options: >- --health-cmd pg_isready --health-interval 10s diff --git a/cognee/tests/test_pgvector.py b/cognee/tests/test_pgvector.py index b90c08a7..8d839ab5 100644 --- a/cognee/tests/test_pgvector.py +++ b/cognee/tests/test_pgvector.py @@ -12,7 +12,7 @@ async def main(): #TODO: Should this be set on pipeline side or in the test? cognee.config.set_vector_engine_provider("pgvector") cognee.config.set_db_provider("postgres") - cognee.config.set_db_name("postgres") + cognee.config.set_db_name("cognee_db") cognee.config.set_db_host("127.0.0.1") cognee.config.set_db_port("5432") cognee.config.set_db_username("cognee") From 71c1374859dc24ac2c28ada633696e16e3a00f92 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 10:19:39 +0200 Subject: [PATCH 19/32] refactor: Move serialize_datetime function Move serialize_datetime function to be in it's own file Refactor #COG-170 --- .../vector/pgvector/PGVectorAdapter.py | 26 ++++--------------- .../vector/pgvector/serialize_datetime.py | 12 +++++++++ 2 files changed, 17 insertions(+), 21 deletions(-) create mode 100644 cognee/infrastructure/databases/vector/pgvector/serialize_datetime.py diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index 8b873b2f..bd10cf0e 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -1,33 +1,17 @@ +import asyncio +from pgvector.sqlalchemy import Vector from typing import List, Optional, get_type_hints +from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy import JSON, Column, Table, select, delete +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker +from .serialize_datetime import serialize_datetime from ..models.ScoredResult import ScoredResult -import asyncio - from ..vector_db_interface import VectorDBInterface, DataPoint -from sqlalchemy.orm import Mapped, mapped_column from ..embeddings.EmbeddingEngine import EmbeddingEngine -from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker -from pgvector.sqlalchemy import Vector - from ...relational.sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter from ...relational.ModelBase import Base -from datetime import datetime - - -# TODO: Find better location for function -def serialize_datetime(data): - """Recursively convert datetime objects in dictionaries/lists to ISO format.""" - if isinstance(data, dict): - return {key: serialize_datetime(value) for key, value in data.items()} - elif isinstance(data, list): - return [serialize_datetime(item) for item in data] - elif isinstance(data, datetime): - return data.isoformat() # Convert datetime to ISO 8601 string - else: - return data - class PGVectorAdapter(SQLAlchemyAdapter, VectorDBInterface): diff --git a/cognee/infrastructure/databases/vector/pgvector/serialize_datetime.py b/cognee/infrastructure/databases/vector/pgvector/serialize_datetime.py new file mode 100644 index 00000000..9cb979e2 --- /dev/null +++ b/cognee/infrastructure/databases/vector/pgvector/serialize_datetime.py @@ -0,0 +1,12 @@ +from datetime import datetime + +def serialize_datetime(data): + """Recursively convert datetime objects in dictionaries/lists to ISO format.""" + if isinstance(data, dict): + return {key: serialize_datetime(value) for key, value in data.items()} + elif isinstance(data, list): + return [serialize_datetime(item) for item in data] + elif isinstance(data, datetime): + return data.isoformat() # Convert datetime to ISO 8601 string + else: + return data \ No newline at end of file From 4a73505e23411baf10887420f1dd88ec1ac0406f Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 10:29:35 +0200 Subject: [PATCH 20/32] refactor: Move create_db_and_tables module from vectors to pgvector Moved create_db_and_tables module to pgvector so it only handles pgvector related db creation Refactor #COG-170 --- cognee/api/v1/add/add.py | 6 +++--- cognee/api/v1/add/add_v2.py | 4 ++-- cognee/infrastructure/databases/vector/__init__.py | 1 - cognee/infrastructure/databases/vector/pgvector/__init__.py | 3 ++- .../databases/vector/{ => pgvector}/create_db_and_tables.py | 4 ++-- 5 files changed, 9 insertions(+), 9 deletions(-) rename cognee/infrastructure/databases/vector/{ => pgvector}/create_db_and_tables.py (81%) diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index b0b2fc8c..10430ed8 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -8,18 +8,18 @@ from cognee.modules.ingestion import get_matched_datasets, save_data_to_file from cognee.shared.utils import send_telemetry from cognee.base_config import get_base_config -from cognee.infrastructure.databases.relational import get_relational_engine, create_db_and_tables +from cognee.infrastructure.databases.relational import get_relational_engine from cognee.modules.users.methods import get_default_user from cognee.tasks.ingestion import get_dlt_destination from cognee.modules.users.permissions.methods import give_permission_on_document from cognee.modules.users.models import User from cognee.modules.data.methods import create_dataset from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables -from cognee.infrastructure.databases.vector import create_db_and_tables as create_vector_db_and_tables +from cognee.infrastructure.databases.vector.pgvector import create_db_and_tables as create_pgvector_db_and_tables async def add(data: Union[BinaryIO, List[BinaryIO], str, List[str]], dataset_name: str = "main_dataset", user: User = None): await create_relational_db_and_tables() - await create_vector_db_and_tables() + await create_pgvector_db_and_tables() if isinstance(data, str): if "data://" in data: diff --git a/cognee/api/v1/add/add_v2.py b/cognee/api/v1/add/add_v2.py index f32f470a..4d43dd65 100644 --- a/cognee/api/v1/add/add_v2.py +++ b/cognee/api/v1/add/add_v2.py @@ -4,11 +4,11 @@ from cognee.modules.pipelines import run_tasks, Task from cognee.tasks.ingestion import save_data_to_storage, ingest_data from cognee.infrastructure.databases.relational import create_db_and_tables as create_relational_db_and_tables -from cognee.infrastructure.databases.vector import create_db_and_tables as create_vector_db_and_tables +from cognee.infrastructure.databases.vector.pgvector import create_db_and_tables as create_pgvector_db_and_tables async def add(data: Union[BinaryIO, list[BinaryIO], str, list[str]], dataset_name: str = "main_dataset", user: User = None): await create_relational_db_and_tables() - await create_vector_db_and_tables() + await create_pgvector_db_and_tables() if user is None: user = await get_default_user() diff --git a/cognee/infrastructure/databases/vector/__init__.py b/cognee/infrastructure/databases/vector/__init__.py index 02d13bb9..604170f1 100644 --- a/cognee/infrastructure/databases/vector/__init__.py +++ b/cognee/infrastructure/databases/vector/__init__.py @@ -4,4 +4,3 @@ from .vector_db_interface import VectorDBInterface from .config import get_vectordb_config from .get_vector_engine import get_vector_engine -from .create_db_and_tables import create_db_and_tables diff --git a/cognee/infrastructure/databases/vector/pgvector/__init__.py b/cognee/infrastructure/databases/vector/pgvector/__init__.py index 84dc8911..130246a3 100644 --- a/cognee/infrastructure/databases/vector/pgvector/__init__.py +++ b/cognee/infrastructure/databases/vector/pgvector/__init__.py @@ -1 +1,2 @@ -from .PGVectorAdapter import PGVectorAdapter \ No newline at end of file +from .PGVectorAdapter import PGVectorAdapter +from .create_db_and_tables import create_db_and_tables \ No newline at end of file diff --git a/cognee/infrastructure/databases/vector/create_db_and_tables.py b/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py similarity index 81% rename from cognee/infrastructure/databases/vector/create_db_and_tables.py rename to cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py index 21522db6..1b09e873 100644 --- a/cognee/infrastructure/databases/vector/create_db_and_tables.py +++ b/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py @@ -1,5 +1,5 @@ -from ..relational.ModelBase import Base -from .get_vector_engine import get_vector_engine, get_vectordb_config +from ...relational.ModelBase import Base +from ..get_vector_engine import get_vector_engine, get_vectordb_config from sqlalchemy import text async def create_db_and_tables(): From a3581689f2e935d965ce98682bdd13152c88bccb Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 11:53:33 +0200 Subject: [PATCH 21/32] refactor: Add setting of database configs through dictionary Added the ability to set database configurations through dictionary for ease of use. Updated test_pgvector to use this way of setting configuration Refactor #COG-170 --- cognee/api/v1/config/config.py | 24 ++++++++++++++++++++++++ cognee/tests/test_pgvector.py | 21 +++++++++++++-------- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/cognee/api/v1/config/config.py b/cognee/api/v1/config/config.py index d5acfd08..6b32752c 100644 --- a/cognee/api/v1/config/config.py +++ b/cognee/api/v1/config/config.py @@ -130,6 +130,30 @@ def set_vector_engine_provider(vector_engine_provider: str): vector_db_config = get_vectordb_config() vector_db_config.vector_engine_provider = vector_engine_provider + @staticmethod + def set_relational_db_config(config_dict: dict): + """ + Updates the relational db config with values from config_dict. + """ + relational_db_config = get_relational_config() + for key, value in config_dict.items(): + if hasattr(relational_db_config, key): + object.__setattr__(relational_db_config, key, value) + else: + raise AttributeError(f"'{key}' is not a valid attribute of the config.") + + @staticmethod + def set_vector_db_config(config_dict: dict): + """ + Updates the vector db config with values from config_dict. + """ + vector_db_config = get_vectordb_config() + for key, value in config_dict.items(): + if hasattr(vector_db_config, key): + object.__setattr__(vector_db_config, key, value) + else: + raise AttributeError(f"'{key}' is not a valid attribute of the config.") + @staticmethod def set_vector_db_key(db_key: str): vector_db_config = get_vectordb_config() diff --git a/cognee/tests/test_pgvector.py b/cognee/tests/test_pgvector.py index 8d839ab5..9105bd8e 100644 --- a/cognee/tests/test_pgvector.py +++ b/cognee/tests/test_pgvector.py @@ -9,14 +9,19 @@ logging.basicConfig(level=logging.DEBUG) async def main(): - #TODO: Should this be set on pipeline side or in the test? - cognee.config.set_vector_engine_provider("pgvector") - cognee.config.set_db_provider("postgres") - cognee.config.set_db_name("cognee_db") - cognee.config.set_db_host("127.0.0.1") - cognee.config.set_db_port("5432") - cognee.config.set_db_username("cognee") - cognee.config.set_db_password("cognee") + cognee.config.set_vector_db_config({ "vector_db_url": "", + "vector_db_key": "", + "vector_engine_provider": "pgvector" + } + ) + cognee.config.set_relational_db_config({"db_path": "", + "db_name": "cognee_db", + "db_host": "127.0.0.1", + "db_port": "5432", + "db_username": "cognee", + "db_password": "cognee", + "db_provider": "postgres"} + ) data_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_pgvector")).resolve()) cognee.config.data_root_directory(data_directory_path) From 7b2022e5f3f5ee747657cca57aa9bf648ee9c7d9 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 12:24:51 +0200 Subject: [PATCH 22/32] refactor: Move psycopg2 to an optional dependency Moved psycopg2 as an optional dependency for postgres to not block users with lib issues it might cause Refactor #COG-170 --- poetry.lock | 4 ++-- pyproject.toml | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index fc2f8d5d..d4d52f54 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. [[package]] name = "aiofiles" @@ -7765,4 +7765,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.9.0,<3.12" -content-hash = "505c4f6f2128aea335ea6065221cf4692750e374f1b09667d250b6f77f3e4b59" +content-hash = "9ce846c0cdd6f980fac43d22ea1f046f485fb42270a5cfe04fed061ea74a4f8c" diff --git a/pyproject.toml b/pyproject.toml index 898ad570..33b8b6a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,7 +71,6 @@ fastapi-users = { version = "*", extras = ["sqlalchemy"] } asyncpg = "^0.29.0" alembic = "^1.13.3" pgvector = "^0.3.5" -psycopg2 = "^2.9.10" @@ -107,6 +106,11 @@ diskcache = "^5.6.3" pandas = "2.0.3" tabulate = "^0.9.0" +[tool.poetry.group.postgres] +optional = true + +[tool.poetry.group.postgres.dependencies] +psycopg2 = "^2.9.10" [tool.ruff] # https://beta.ruff.rs/docs/ line-length = 100 From 8002db72c39328fab796ec6a11f7ff4286c5ccf5 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 12:46:45 +0200 Subject: [PATCH 23/32] chore: Add installing of depdendencies along with postgres group Since the postgres related libraries are no longer mandatory specification is needed for them to be installed as well Chore #COG-170 --- .github/workflows/test_pgvector.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_pgvector.yml b/.github/workflows/test_pgvector.yml index 401be800..067903da 100644 --- a/.github/workflows/test_pgvector.yml +++ b/.github/workflows/test_pgvector.yml @@ -58,7 +58,7 @@ jobs: installer-parallel: true - name: Install dependencies - run: poetry install --no-interaction + run: poetry install --with postgres --no-interaction - name: Run default PGVector env: From dbc86e284a96bc737006cb7a86d0516c5ee5abe8 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 12:51:26 +0200 Subject: [PATCH 24/32] chore: Add pgvector back to mandatory dependencies pgvector is imported directly in PGVector adapter therefore it needs to be installed with mandatory dependencies Chore #COG-170 --- poetry.lock | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/poetry.lock b/poetry.lock index af21e41e..0ef2e0ab 100644 --- a/poetry.lock +++ b/poetry.lock @@ -7765,4 +7765,4 @@ weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.9.0,<3.12" -content-hash = "34f873038262904af283d31d4c3c68fd535ca7b36c6837fc97c0d1cc31cf89cb" +content-hash = "4daa7346dc90750bb6b82ef353f321da2370b60515396d2e1390a3f35510ce55" diff --git a/pyproject.toml b/pyproject.toml index 7c88f23b..65a6e7be 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ sentry-sdk = {extras = ["fastapi"], version = "^2.9.0"} fastapi-users = { version = "*", extras = ["sqlalchemy"] } asyncpg = "^0.29.0" alembic = "^1.13.3" +pgvector = "^0.3.5" [tool.poetry.extras] filesystem = ["s3fs", "botocore"] @@ -107,7 +108,6 @@ optional = true [tool.poetry.group.postgres.dependencies] psycopg2 = "^2.9.10" -pgvector = "^0.3.5" [tool.ruff] # https://beta.ruff.rs/docs/ line-length = 100 From c7ed46ddaab4c91bb7dd8e9f360ea6397416860d Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 12:57:17 +0200 Subject: [PATCH 25/32] fix: Change to new syntax for vector_engine_provider Syntax changed for vector_engine_provider to vector_db_provider Fix #COG-170 --- .../databases/vector/pgvector/create_db_and_tables.py | 2 +- cognee/tests/test_pgvector.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py b/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py index 1b09e873..dd029de0 100644 --- a/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py +++ b/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py @@ -6,7 +6,7 @@ async def create_db_and_tables(): vector_config = get_vectordb_config() vector_engine = get_vector_engine() - if vector_config.vector_engine_provider == "pgvector": + if vector_config.vector_db_provider == "pgvector": async with vector_engine.engine.begin() as connection: if len(Base.metadata.tables.keys()) > 0: await connection.run_sync(Base.metadata.create_all) diff --git a/cognee/tests/test_pgvector.py b/cognee/tests/test_pgvector.py index 9105bd8e..88195bbc 100644 --- a/cognee/tests/test_pgvector.py +++ b/cognee/tests/test_pgvector.py @@ -11,7 +11,7 @@ async def main(): cognee.config.set_vector_db_config({ "vector_db_url": "", "vector_db_key": "", - "vector_engine_provider": "pgvector" + "vector_db_provider": "pgvector" } ) cognee.config.set_relational_db_config({"db_path": "", From 6b9a14270daeef151858de87d91aeeca5abd2a97 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 13:45:23 +0200 Subject: [PATCH 26/32] refactor: Fix spacing, remove unused config methods Remove unused config methods so we don't have to maintain them. Fixed spacing in test_pgvector integration test. Refactor #COG-170 --- cognee/api/v1/config/config.py | 35 ---------------------------- cognee/tests/test_pgvector.py | 42 +++++++++++++++++++++++----------- 2 files changed, 29 insertions(+), 48 deletions(-) diff --git a/cognee/api/v1/config/config.py b/cognee/api/v1/config/config.py index 8d5f7dea..2f4167b7 100644 --- a/cognee/api/v1/config/config.py +++ b/cognee/api/v1/config/config.py @@ -39,41 +39,6 @@ def set_classification_model(classification_model: object): cognify_config = get_cognify_config() cognify_config.classification_model = classification_model - @staticmethod - def set_db_name(db_name: str): - cognify_config = get_relational_config() - cognify_config.db_name = db_name - - @staticmethod - def set_db_path(db_path: str): - cognify_config = get_relational_config() - cognify_config.db_path = db_path - - @staticmethod - def set_db_host(db_host: str): - cognify_config = get_relational_config() - cognify_config.db_host = db_host - - @staticmethod - def set_db_port(db_port: str): - cognify_config = get_relational_config() - cognify_config.db_port = db_port - - @staticmethod - def set_db_username(db_username: str): - cognify_config = get_relational_config() - cognify_config.db_username = db_username - - @staticmethod - def set_db_password(db_password: str): - cognify_config = get_relational_config() - cognify_config.db_password = db_password - - @staticmethod - def set_db_provider(db_provider: str): - cognify_config = get_relational_config() - cognify_config.db_provider = db_provider - @staticmethod def set_summarization_model(summarization_model: object): cognify_config = get_cognify_config() diff --git a/cognee/tests/test_pgvector.py b/cognee/tests/test_pgvector.py index 88195bbc..963d55c6 100644 --- a/cognee/tests/test_pgvector.py +++ b/cognee/tests/test_pgvector.py @@ -1,5 +1,3 @@ - - import os import logging import pathlib @@ -8,32 +6,48 @@ logging.basicConfig(level=logging.DEBUG) + async def main(): - cognee.config.set_vector_db_config({ "vector_db_url": "", - "vector_db_key": "", + cognee.config.set_vector_db_config( + { + "vector_db_url": "", + "vector_db_key": "", "vector_db_provider": "pgvector" } ) - cognee.config.set_relational_db_config({"db_path": "", + cognee.config.set_relational_db_config( + { + "db_path": "", "db_name": "cognee_db", "db_host": "127.0.0.1", "db_port": "5432", "db_username": "cognee", "db_password": "cognee", - "db_provider": "postgres"} + "db_provider": "postgres", + } ) - data_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_pgvector")).resolve()) + data_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_pgvector") + ).resolve() + ) cognee.config.data_root_directory(data_directory_path) - cognee_directory_path = str(pathlib.Path(os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_pgvector")).resolve()) + cognee_directory_path = str( + pathlib.Path( + os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_pgvector") + ).resolve() + ) cognee.config.system_root_directory(cognee_directory_path) await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata = True) + await cognee.prune.prune_system(metadata=True) dataset_name = "cs_explanations" - explanation_file_path = os.path.join(pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt") + explanation_file_path = os.path.join( + pathlib.Path(__file__).parent, "test_data/Natural_language_processing.txt" + ) await cognee.add([explanation_file_path], dataset_name) text = """A quantum computer is a computer that takes advantage of quantum mechanical phenomena. @@ -49,23 +63,24 @@ async def main(): await cognee.cognify([dataset_name]) from cognee.infrastructure.databases.vector import get_vector_engine + vector_engine = get_vector_engine() random_node = (await vector_engine.search("entities", "AI"))[0] random_node_name = random_node.payload["name"] - search_results = await cognee.search(SearchType.INSIGHTS, query = random_node_name) + search_results = await cognee.search(SearchType.INSIGHTS, query=random_node_name) assert len(search_results) != 0, "The search results list is empty." print("\n\nExtracted sentences are:\n") for result in search_results: print(f"{result}\n") - search_results = await cognee.search(SearchType.CHUNKS, query = random_node_name) + search_results = await cognee.search(SearchType.CHUNKS, query=random_node_name) assert len(search_results) != 0, "The search results list is empty." print("\n\nExtracted chunks are:\n") for result in search_results: print(f"{result}\n") - search_results = await cognee.search(SearchType.SUMMARIES, query = random_node_name) + search_results = await cognee.search(SearchType.SUMMARIES, query=random_node_name) assert len(search_results) != 0, "Query related summaries don't exist." print("\n\Extracted summaries are:\n") for result in search_results: @@ -74,4 +89,5 @@ async def main(): if __name__ == "__main__": import asyncio + asyncio.run(main()) From c78627f9260ec088935a13bd7c50a44fe260381b Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 14:02:36 +0200 Subject: [PATCH 27/32] chore: Remove postgres group from pyproject.toml install postgres dependencies through extras To be uniform with how we install optional dependencies a change has been made so that postgres optional dependencies are installed through extras Chore #COG-170 --- poetry.lock | 5 +++-- pyproject.toml | 8 ++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/poetry.lock b/poetry.lock index 0ef2e0ab..acd56e02 100644 --- a/poetry.lock +++ b/poetry.lock @@ -4932,7 +4932,7 @@ test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] name = "psycopg2" version = "2.9.10" description = "psycopg2 - Python-PostgreSQL Database Adapter" -optional = false +optional = true python-versions = ">=3.8" files = [ {file = "psycopg2-2.9.10-cp310-cp310-win32.whl", hash = "sha256:5df2b672140f95adb453af93a7d669d7a7bf0a56bcd26f1502329166f4a61716"}, @@ -7759,10 +7759,11 @@ cli = [] filesystem = [] neo4j = ["neo4j"] notebook = ["overrides"] +postgres = ["psycopg2"] qdrant = ["qdrant-client"] weaviate = ["weaviate-client"] [metadata] lock-version = "2.0" python-versions = ">=3.9.0,<3.12" -content-hash = "4daa7346dc90750bb6b82ef353f321da2370b60515396d2e1390a3f35510ce55" +content-hash = "70a0072dce8de95d64b862f9a9df48aaec84c8d8515ae018fce4426a0dcacf88" diff --git a/pyproject.toml b/pyproject.toml index 65a6e7be..22074959 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -71,6 +71,7 @@ fastapi-users = { version = "*", extras = ["sqlalchemy"] } asyncpg = "^0.29.0" alembic = "^1.13.3" pgvector = "^0.3.5" +psycopg2 = {version = "^2.9.10", optional = true} [tool.poetry.extras] filesystem = ["s3fs", "botocore"] @@ -78,6 +79,7 @@ cli = ["pipdeptree", "cron-descriptor"] weaviate = ["weaviate-client"] qdrant = ["qdrant-client"] neo4j = ["neo4j"] +postgres = ["psycopg2"] notebook = ["ipykernel", "overrides", "ipywidgets", "jupyterlab", "jupyterlab_widgets", "jupyterlab-server", "jupyterlab-git"] [tool.poetry.group.dev.dependencies] @@ -103,12 +105,6 @@ diskcache = "^5.6.3" pandas = "2.0.3" tabulate = "^0.9.0" -[tool.poetry.group.postgres] -optional = true - -[tool.poetry.group.postgres.dependencies] -psycopg2 = "^2.9.10" - [tool.ruff] # https://beta.ruff.rs/docs/ line-length = 100 ignore = ["F401"] From d30c337370d3afe82dbaf6dde133ee56b8c7a91c Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 14:23:36 +0200 Subject: [PATCH 28/32] refactor: Use SQLAlchemyAdapter create_database Added reusing of sqlalchemy adapters create_database method instead of directly writing code for database creation Refactor #COG-170 --- .../databases/vector/pgvector/create_db_and_tables.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py b/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py index dd029de0..99d53d69 100644 --- a/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py +++ b/cognee/infrastructure/databases/vector/pgvector/create_db_and_tables.py @@ -7,9 +7,8 @@ async def create_db_and_tables(): vector_engine = get_vector_engine() if vector_config.vector_db_provider == "pgvector": + vector_engine.create_database() async with vector_engine.engine.begin() as connection: - if len(Base.metadata.tables.keys()) > 0: - await connection.run_sync(Base.metadata.create_all) await connection.execute(text("CREATE EXTENSION IF NOT EXISTS vector;")) From 0e1533aa44bf88fb7961b79e193a7d05a8717c3a Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 14:27:36 +0200 Subject: [PATCH 29/32] chore: Update how postgres dependencies are installed in integration test for pgvector Updated how postgres dependencies are installed due to change in pyproject.toml Chore #COG-170 --- .github/workflows/test_pgvector.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test_pgvector.yml b/.github/workflows/test_pgvector.yml index 067903da..913d249e 100644 --- a/.github/workflows/test_pgvector.yml +++ b/.github/workflows/test_pgvector.yml @@ -58,7 +58,7 @@ jobs: installer-parallel: true - name: Install dependencies - run: poetry install --with postgres --no-interaction + run: poetry install -E postgres --no-interaction - name: Run default PGVector env: From 195929e2455b0b119339bc586d6ba92903f90f02 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 15:12:45 +0200 Subject: [PATCH 30/32] refactor: Fix typo Refactor fix typo in pgvector integration test Refactor #COG-170 --- cognee/tests/test_pgvector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/tests/test_pgvector.py b/cognee/tests/test_pgvector.py index 963d55c6..02d292d6 100644 --- a/cognee/tests/test_pgvector.py +++ b/cognee/tests/test_pgvector.py @@ -82,7 +82,7 @@ async def main(): search_results = await cognee.search(SearchType.SUMMARIES, query=random_node_name) assert len(search_results) != 0, "Query related summaries don't exist." - print("\n\Extracted summaries are:\n") + print("\n\nExtracted summaries are:\n") for result in search_results: print(f"{result}\n") From dc46304a8da9e4e36b5c22973a13ae25c7515335 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 15:15:45 +0200 Subject: [PATCH 31/32] fix: Add missing await statement to LanceDBAdapter and PGVectorAdapter Added missing await statement to batch search for LanceDB and PGVector adapters Fix #COG-170 --- .../infrastructure/databases/vector/lancedb/LanceDBAdapter.py | 2 +- .../infrastructure/databases/vector/pgvector/PGVectorAdapter.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cognee/infrastructure/databases/vector/lancedb/LanceDBAdapter.py b/cognee/infrastructure/databases/vector/lancedb/LanceDBAdapter.py index 3bb47fcc..40463448 100644 --- a/cognee/infrastructure/databases/vector/lancedb/LanceDBAdapter.py +++ b/cognee/infrastructure/databases/vector/lancedb/LanceDBAdapter.py @@ -152,7 +152,7 @@ async def batch_search( ): query_vectors = await self.embedding_engine.embed_text(query_texts) - return asyncio.gather( + return await asyncio.gather( *[self.search( collection_name = collection_name, query_vector = query_vector, diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index bd10cf0e..8b0ddcd8 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -200,7 +200,7 @@ async def batch_search( ): query_vectors = await self.embedding_engine.embed_text(query_texts) - return asyncio.gather( + return await asyncio.gather( *[ self.search( collection_name=collection_name, From 0c6f0198382e0b9082d607669f5516a45eab149b Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 22 Oct 2024 15:41:07 +0200 Subject: [PATCH 32/32] refactor: Remove broad exception handling from PGVectorAdapter Removed broad exception handling for the search function in PGVectorAdapter Refactor #COG-170 --- .../vector/pgvector/PGVectorAdapter.py | 55 +++++++++---------- 1 file changed, 25 insertions(+), 30 deletions(-) diff --git a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py index 8b0ddcd8..b13346cf 100644 --- a/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py +++ b/cognee/infrastructure/databases/vector/pgvector/PGVectorAdapter.py @@ -157,39 +157,34 @@ async def search( # Use async session to connect to the database async with self.get_async_session() as session: - try: - # Get PGVectorDataPoint Table from database - PGVectorDataPoint = await self.get_table(collection_name) - - # Find closest vectors to query_vector - closest_items = await session.execute( - select( - PGVectorDataPoint, - PGVectorDataPoint.c.vector.cosine_distance(query_vector).label( - "similarity" - ), - ) - .order_by("similarity") - .limit(limit) - ) + # Get PGVectorDataPoint Table from database + PGVectorDataPoint = await self.get_table(collection_name) - vector_list = [] - # Extract distances and find min/max for normalization - for vector in closest_items: - # TODO: Add normalization of similarity score - vector_list.append(vector) + # Find closest vectors to query_vector + closest_items = await session.execute( + select( + PGVectorDataPoint, + PGVectorDataPoint.c.vector.cosine_distance(query_vector).label( + "similarity" + ), + ) + .order_by("similarity") + .limit(limit) + ) - # Create and return ScoredResult objects - return [ - ScoredResult( - id=str(row.id), payload=row.payload, score=row.similarity - ) - for row in vector_list - ] + vector_list = [] + # Extract distances and find min/max for normalization + for vector in closest_items: + # TODO: Add normalization of similarity score + vector_list.append(vector) - except Exception as e: - print(f"Error during search: {e}") - return [] + # Create and return ScoredResult objects + return [ + ScoredResult( + id=str(row.id), payload=row.payload, score=row.similarity + ) + for row in vector_list + ] async def batch_search( self,