Skip to content

Commit

Permalink
add sqlalchemy engine
Browse files Browse the repository at this point in the history
  • Loading branch information
Vasilije1990 committed Jul 21, 2024
1 parent 866270b commit 77e8c1b
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 8 deletions.
5 changes: 4 additions & 1 deletion cognee/infrastructure/databases/relational/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ class RelationalConfig(BaseSettings):
db_port: str = "5432"
db_user: str = "cognee"
db_password: str = "cognee"
database_engine: object = create_relational_engine(db_path, db_name)
db_provider: str = "duckdb"
database_engine: object = create_relational_engine(db_path, db_name, db_provider)
db_file_path: str = os.path.join(db_path, db_name)


model_config = SettingsConfigDict(env_file = ".env", extra = "allow")

def create_engine(self):
Expand All @@ -29,6 +31,7 @@ def to_dict(self) -> dict:
"db_user": self.db_user,
"db_password": self.db_password,
"db_engine": self.database_engine,
"db_provider": self.db_provider,
}

@lru_cache
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,33 @@
from enum import Enum

from cognee.infrastructure.databases.relational.sqlalchemy.SqlAlchemyAdapter import SQLAlchemyAdapter
from cognee.infrastructure.files.storage import LocalStorage
from cognee.infrastructure.databases.relational import DuckDBAdapter
from cognee.infrastructure.databases.relational import DuckDBAdapter, get_relationaldb_config


class DBProvider(Enum):
DUCKDB = "duckdb"
POSTGRES = "postgres"



def create_relational_engine(db_path: str, db_name: str):
def create_relational_engine(db_path: str, db_name: str, db_provider:str):
LocalStorage.ensure_directory_exists(db_path)

return DuckDBAdapter(
db_name = db_name,
db_path = db_path,
)
llm_config = get_relationaldb_config()

provider = DBProvider(llm_config.llm_provider)


if provider == DBProvider.DUCKDB:

return DuckDBAdapter(
db_name = db_name,
db_path = db_path,
)
elif provider == DBProvider.POSTGRES:
return SQLAlchemyAdapter(
db_name = db_name,
db_path = db_path,
db_type = db_provider,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import os
from sqlalchemy import create_engine, MetaData, Table, Column, String, Boolean, TIMESTAMP, text
from sqlalchemy.orm import sessionmaker

class SQLAlchemyAdapter():
def __init__(self, db_type: str, db_path: str, db_name: str):
self.db_location = os.path.abspath(os.path.join(db_path, db_name))
self.engine = create_engine(f"{db_type}:///{self.db_location}")
self.Session = sessionmaker(bind=self.engine)

def get_datasets(self):
with self.engine.connect() as connection:
result = connection.execute(text("SELECT DISTINCT schema_name FROM information_schema.tables;"))
tables = [row['schema_name'] for row in result]
return list(
filter(
lambda schema_name: not schema_name.endswith("staging") and schema_name != "cognee",
tables
)
)

def get_files_metadata(self, dataset_name: str):
with self.engine.connect() as connection:
result = connection.execute(text(f"SELECT id, name, file_path, extension, mime_type FROM {dataset_name}.file_metadata;"))
return [dict(row) for row in result]

def create_table(self, schema_name: str, table_name: str, table_config: list[dict]):
fields_query_parts = [f"{item['name']} {item['type']}" for item in table_config]
with self.engine.connect() as connection:
connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name};"))
connection.execute(text(f"CREATE TABLE IF NOT EXISTS {schema_name}.{table_name} ({', '.join(fields_query_parts)});"))

def delete_table(self, table_name: str):
with self.engine.connect() as connection:
connection.execute(text(f"DROP TABLE IF EXISTS {table_name};"))

def insert_data(self, schema_name: str, table_name: str, data: list[dict]):
columns = ", ".join(data[0].keys())
values = ", ".join([f"({', '.join([f':{key}' for key in row.keys()])})" for row in data])
insert_query = text(f"INSERT INTO {schema_name}.{table_name} ({columns}) VALUES {values};")
with self.engine.connect() as connection:
connection.execute(insert_query, data)

def get_data(self, table_name: str, filters: dict = None):
with self.engine.connect() as connection:
query = f"SELECT * FROM {table_name}"
if filters:
filter_conditions = " AND ".join([
f"{key} IN ({', '.join([f':{key}{i}' for i in range(len(value))])})" if isinstance(value, list)
else f"{key} = :{key}" for key, value in filters.items()
])
query += f" WHERE {filter_conditions};"
query = text(query)
results = connection.execute(query, filters)
else:
query += ";"
query = text(query)
results = connection.execute(query)
return {result["data_id"]: result["status"] for result in results}

def execute_query(self, query):
with self.engine.connect() as connection:
result = connection.execute(text(query))
return [dict(row) for row in result]

def load_cognify_data(self, data):
metadata = MetaData()
cognify_table = Table(
'cognify', metadata,
Column('document_id', String),
Column('layer_id', String),
Column('created_at', TIMESTAMP, server_default=text('CURRENT_TIMESTAMP')),
Column('updated_at', TIMESTAMP, nullable=True, default=None),
Column('processed', Boolean, default=False),
Column('document_id_target', String, nullable=True)
)
metadata.create_all(self.engine)
insert_query = cognify_table.insert().values(document_id=text(':document_id'), layer_id=text(':layer_id'))
with self.engine.connect() as connection:
connection.execute(insert_query, data)

def fetch_cognify_data(self, excluded_document_id: str):
with self.engine.connect() as connection:
connection.execute(text("""
CREATE TABLE IF NOT EXISTS cognify (
document_id STRING,
layer_id STRING,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT NULL,
processed BOOLEAN DEFAULT FALSE,
document_id_target STRING NULL
);
"""))
query = text(f"""
SELECT document_id, layer_id, created_at, updated_at, processed
FROM cognify
WHERE document_id != :excluded_document_id AND processed = FALSE;
""")
records = connection.execute(query, {'excluded_document_id': excluded_document_id}).fetchall()
if records:
document_ids = tuple(record['document_id'] for record in records)
update_query = text(f"UPDATE cognify SET processed = TRUE WHERE document_id IN :document_ids;")
connection.execute(update_query, {'document_ids': document_ids})
return [dict(record) for record in records]

def delete_cognify_data(self):
with self.engine.connect() as connection:
connection.execute(text("""
CREATE TABLE IF NOT EXISTS cognify (
document_id STRING,
layer_id STRING,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT NULL,
processed BOOLEAN DEFAULT FALSE,
document_id_target STRING NULL
);
"""))
connection.execute(text("DELETE FROM cognify;"))
connection.execute(text("DROP TABLE cognify;"))

def delete_database(self):
from cognee.infrastructure.files.storage import LocalStorage
LocalStorage.remove(self.db_location)
if LocalStorage.file_exists(self.db_location + ".wal"):
LocalStorage.remove(self.db_location + ".wal")
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase

from cognee.infrastructure.databases.relational import get_relationaldb_config
from cognee.infrastructure.databases.relational.create_relational_engine import create_relational_engine

DATABASE_URL = "sqlite+aiosqlite:///./test.db"


Expand All @@ -17,8 +20,10 @@ class User(SQLAlchemyBaseUserTableUUID, Base):



llm_config = get_relationaldb_config()


engine = create_async_engine(DATABASE_URL)
engine = create_relational_engine(llm_config.db_path, llm_config.db_name, llm_config.db_provider)
async_session_maker = async_sessionmaker(engine, expire_on_commit=False)


Expand Down

0 comments on commit 77e8c1b

Please sign in to comment.