Skip to content

Commit

Permalink
Merge pull request #837 from ParisNeo/main
Browse files Browse the repository at this point in the history
Fixes and Enhancements for PostgreSQL and JSON Document Storage
  • Loading branch information
YanSte authored Feb 18, 2025
2 parents e1e00a8 + f7ef4c7 commit 99dc485
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 14 deletions.
2 changes: 1 addition & 1 deletion lightrag/api/docs/LightRagWithPostGRESQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ Replace placeholders like `your_role_name`, `your_password`, and `your_database`
Start the LightRAG server using specified options:

```bash
lightrag-server --port 9626 --key sk-SL1 --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
lightrag-server --port 9621 --key sk-somepassword --kv-storage PGKVStorage --graph-storage PGGraphStorage --vector-storage PGVectorStorage --doc-status-storage PGDocStatusStorage
```

Replace `the-port-number` with your desired port number (default is 9621) and `your-secret-key` with a secure key.
Expand Down
4 changes: 4 additions & 0 deletions lightrag/kg/json_doc_status_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ async def delete(self, doc_ids: list[str]):
for doc_id in doc_ids:
self._data.pop(doc_id, None)
await self.index_done_callback()

async def drop(self) -> None:
"""Drop the storage"""
self._data.clear()
77 changes: 64 additions & 13 deletions lightrag/kg/postgres_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ async def filter_keys(self, keys: set[str]) -> set[str]:
exist_keys = [key["id"] for key in res]
else:
exist_keys = []
data = set([s for s in keys if s not in exist_keys])
return data
new_keys = set([s for s in keys if s not in exist_keys])
return new_keys
except Exception as e:
logger.error(f"PostgreSQL database error: {e}")
print(sql)
Expand Down Expand Up @@ -301,6 +301,11 @@ async def index_done_callback(self) -> None:
# PG handles persistence automatically
pass

async def drop(self) -> None:
"""Drop the storage"""
drop_sql = SQL_TEMPLATES["drop_all"]
await self.db.execute(drop_sql)


@final
@dataclass
Expand Down Expand Up @@ -432,16 +437,26 @@ async def delete_entity_relation(self, entity_name: str) -> None:
@dataclass
class PGDocStatusStorage(DocStatusStorage):
async def filter_keys(self, keys: set[str]) -> set[str]:
"""Return keys that don't exist in storage"""
keys = ",".join([f"'{_id}'" for _id in keys])
sql = f"SELECT id FROM LIGHTRAG_DOC_STATUS WHERE workspace='{self.db.workspace}' AND id IN ({keys})"
result = await self.db.query(sql, multirows=True)
# The result is like [{'id': 'id1'}, {'id': 'id2'}, ...].
if result is None:
return set(keys)
else:
existed = set([element["id"] for element in result])
return set(keys) - existed
"""Filter out duplicated content"""
sql = SQL_TEMPLATES["filter_keys"].format(
table_name=namespace_to_table_name(self.namespace),
ids=",".join([f"'{id}'" for id in keys]),
)
params = {"workspace": self.db.workspace}
try:
res = await self.db.query(sql, params, multirows=True)
if res:
exist_keys = [key["id"] for key in res]
else:
exist_keys = []
new_keys = set([s for s in keys if s not in exist_keys])
print(f"keys: {keys}")
print(f"new_keys: {new_keys}")
return new_keys
except Exception as e:
logger.error(f"PostgreSQL database error: {e}")
print(sql)
print(params)

async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and id=$2"
Expand Down Expand Up @@ -483,7 +498,7 @@ async def get_docs_by_status(
sql = "select * from LIGHTRAG_DOC_STATUS where workspace=$1 and status=$2"
params = {"workspace": self.db.workspace, "status": status.value}
result = await self.db.query(sql, params, True)
return {
docs_by_status = {
element["id"]: DocProcessingStatus(
content=result[0]["content"],
content_summary=element["content_summary"],
Expand All @@ -495,6 +510,7 @@ async def get_docs_by_status(
)
for element in result
}
return docs_by_status

async def index_done_callback(self) -> None:
# PG handles persistence automatically
Expand Down Expand Up @@ -531,6 +547,11 @@ async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
)
return data

async def drop(self) -> None:
"""Drop the storage"""
drop_sql = SQL_TEMPLATES["drop_doc_full"]
await self.db.execute(drop_sql)


class PGGraphQueryException(Exception):
"""Exception for the AGE queries."""
Expand Down Expand Up @@ -1012,6 +1033,13 @@ async def get_knowledge_graph(
) -> KnowledgeGraph:
raise NotImplementedError

async def drop(self) -> None:
"""Drop the storage"""
drop_sql = SQL_TEMPLATES["drop_vdb_entity"]
await self.db.execute(drop_sql)
drop_sql = SQL_TEMPLATES["drop_vdb_relation"]
await self.db.execute(drop_sql)


NAMESPACE_TABLE_MAP = {
NameSpace.KV_STORE_FULL_DOCS: "LIGHTRAG_DOC_FULL",
Expand Down Expand Up @@ -1194,4 +1222,27 @@ def namespace_to_table_name(namespace: str) -> str:
FROM LIGHTRAG_DOC_CHUNKS where workspace=$1)
WHERE distance>$2 ORDER BY distance DESC LIMIT $3
""",
# DROP tables
"drop_all": """
DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
DROP TABLE IF EXISTS LIGHTRAG_DOC_CHUNKS CASCADE;
DROP TABLE IF EXISTS LIGHTRAG_LLM_CACHE CASCADE;
DROP TABLE IF EXISTS LIGHTRAG_VDB_ENTITY CASCADE;
DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
""",
"drop_doc_full": """
DROP TABLE IF EXISTS LIGHTRAG_DOC_FULL CASCADE;
""",
"drop_doc_chunks": """
DROP TABLE IF EXISTS LIGHTRAG_DOC_CHUNKS CASCADE;
""",
"drop_llm_cache": """
DROP TABLE IF EXISTS LIGHTRAG_LLM_CACHE CASCADE;
""",
"drop_vdb_entity": """
DROP TABLE IF EXISTS LIGHTRAG_VDB_ENTITY CASCADE;
""",
"drop_vdb_relation": """
DROP TABLE IF EXISTS LIGHTRAG_VDB_RELATION CASCADE;
""",
}

0 comments on commit 99dc485

Please sign in to comment.