Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExecutionStore + MongoDB integration tests and fixes #5095

Merged
merged 13 commits into from
Nov 14, 2024
20 changes: 8 additions & 12 deletions fiftyone/factory/repo_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,12 @@ def delegated_operation_repo() -> DelegatedOperationRepo:
@staticmethod
def execution_store_repo(
dataset_id: Optional[ObjectId] = None,
collection_name: Optional[str] = None,
) -> ExecutionStoreRepo:
if (
MongoExecutionStoreRepo.COLLECTION_NAME
not in RepositoryFactory.repos
):
RepositoryFactory.repos[
MongoExecutionStoreRepo.COLLECTION_NAME
] = MongoExecutionStoreRepo(
collection=_get_db()[MongoExecutionStoreRepo.COLLECTION_NAME],
dataset_id=dataset_id,
)

return RepositoryFactory.repos[MongoExecutionStoreRepo.COLLECTION_NAME]
collection = _get_db()[
collection_name or MongoExecutionStoreRepo.COLLECTION_NAME
]
return MongoExecutionStoreRepo(
collection=collection,
dataset_id=dataset_id,
)
112 changes: 92 additions & 20 deletions fiftyone/factory/repos/execution_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from bson import ObjectId
from pymongo.collection import Collection
from typing import Any, Dict

from fiftyone.operators.store.models import StoreDocument, KeyDocument

Expand All @@ -34,23 +35,42 @@ def __init__(self, collection: Collection, dataset_id: ObjectId = None):
self._collection = collection
self._dataset_id = dataset_id

def create_store(self, store_name) -> StoreDocument:
def create_store(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a PR where @imanjra plans to change this to def store() so just a heads up in case a conflict arises

self, store_name, metadata: Dict[str, Any] = None
) -> StoreDocument:
"""Creates a store associated with the current context."""
store_doc = StoreDocument(
store_name=store_name,
dataset_id=self._dataset_id,
value=metadata,
)
self._collection.insert_one(store_doc.to_mongo_dict())
return store_doc

def has_store(self, store_name):
def get_store(self, store_name) -> StoreDocument:
brimoor marked this conversation as resolved.
Show resolved Hide resolved
"""Gets a store associated with the current context."""
raw_store_doc = self._collection.find_one(
dict(
store_name=store_name,
key="__store__",
dataset_id=self._dataset_id,
)
)
if not raw_store_doc and self.has_store(store_name):
return StoreDocument(
store_name=store_name, dataset_id=self._dataset_id
)

store_doc = StoreDocument(**raw_store_doc) if raw_store_doc else None
return store_doc

def has_store(self, store_name) -> bool:
"""Checks whether a store with the given name exists in the current
context.
"""
result = self._collection.find_one(
dict(
store_name=store_name,
key="__store__",
dataset_id=self._dataset_id,
)
)
Expand All @@ -66,9 +86,25 @@ def list_stores(self) -> list[str]:

def count_stores(self) -> int:
"""Counts the stores associated with the current context."""
return self._collection.count_documents(
dict(key="__store__", dataset_id=self._dataset_id),
)
pipeline = [
{
"$match": {
"dataset_id": self._dataset_id,
}
},
{
"$group": {
"_id": {
"store_name": "$store_name",
"dataset_id": "$dataset_id",
}
}
},
{"$count": "total_stores"},
]

result = list(self._collection.aggregate(pipeline))
return result[0]["total_stores"] if result else 0

def delete_store(self, store_name) -> int:
"""Deletes the specified store."""
Expand Down Expand Up @@ -99,9 +135,6 @@ def set_key(self, store_name, key, value, ttl=None) -> KeyDocument:
"dataset_id": self._dataset_id,
}

if self._dataset_id is None:
on_insert_fields.pop("dataset_id")

# Prepare the update operations
update_fields = {
"$set": {
Expand Down Expand Up @@ -134,6 +167,13 @@ def set_key(self, store_name, key, value, ttl=None) -> KeyDocument:

return key_doc

def has_key(self, store_name, key) -> bool:
"""Determines whether a key exists in the specified store."""
result = self._collection.find_one(
dict(store_name=store_name, key=key, dataset_id=self._dataset_id)
)
return bool(result)

def get_key(self, store_name, key) -> KeyDocument:
"""Gets a key from the specified store."""
raw_key_doc = self._collection.find_one(
Expand Down Expand Up @@ -198,20 +238,52 @@ def has_store_global(self, store_name):
)
return bool(result)

def list_stores_global(self) -> list[str]:
"""Lists the stores in the execution store across all datasets and the
global context.
"""
result = self._collection.find(
dict(key="__store__"), {"store_name": 1}
)
return [d["store_name"] for d in result]
def list_stores_global(self) -> list[StoreDocument]:
"""Lists stores across all datasets and the global context."""
pipeline = [
{
"$group": {
"_id": {
"store_name": "$store_name",
"dataset_id": "$dataset_id",
}
}
},
{
"$project": {
ritch marked this conversation as resolved.
Show resolved Hide resolved
"_id": 0,
"store_name": "$_id.store_name",
"dataset_id": "$_id.dataset_id",
}
},
]

result = self._collection.aggregate(pipeline)
return [StoreDocument(**d) for d in result]

def count_stores_global(self) -> int:
"""Counts the stores in the execution store across all datasets and the
global context.
"""Counts stores across all datasets and the global context."""
pipeline = [
{
"$group": {
"_id": {
"store_name": "$store_name",
"dataset_id": "$dataset_id",
}
}
},
{"$count": "total_stores"},
]

result = list(self._collection.aggregate(pipeline))
return result[0]["total_stores"] if result else 0

def delete_store_global(self, store_name) -> int:
"""Deletes the specified store across all datasets and the global
context.
"""
return self._collection.count_documents(dict(key="__store__"))
result = self._collection.delete_many(dict(store_name=store_name))
return result.deleted_count
ritch marked this conversation as resolved.
Show resolved Hide resolved


class MongoExecutionStoreRepo(ExecutionStoreRepo):
Expand Down
8 changes: 6 additions & 2 deletions fiftyone/operators/store/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def to_mongo_dict(self, exclude_id: bool = True) -> dict[str, Any]:
data = asdict(self)
if exclude_id:
data.pop("_id", None)
if self.dataset_id is None:
data.pop("dataset_id", None)

return data


Expand All @@ -50,3 +49,8 @@ class StoreDocument(KeyDocument):

key: str = "__store__"
value: Optional[dict[str, Any]] = None

@property
def metadata(self) -> dict[str, Any]:
"""The metadata associated with the store."""
return self.value or {}
44 changes: 40 additions & 4 deletions fiftyone/operators/store/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def __init__(
self,
repo: Optional["ExecutionStoreRepo"] = None,
dataset_id: Optional[ObjectId] = None,
collection_name: str = None,
):

from fiftyone.factory.repo_factory import (
Expand All @@ -45,12 +46,15 @@ def __init__(

if repo is None:
repo = RepositoryFactory.execution_store_repo(
dataset_id=dataset_id
dataset_id=dataset_id,
collection_name=collection_name,
)

self._dataset_id = dataset_id
self._repo: ExecutionStoreRepo = repo

def create_store(self, store_name: str) -> StoreDocument:
def create_store(
Copy link
Contributor

@Br2850 Br2850 Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same create_store comment as before, I don't care which method name is used - I added a try/except for both just in case in DQ panel

self, store_name: str, metadata: Optional[dict[str, Any]] = None
) -> StoreDocument:
"""Creates a new store with the specified name.

Args:
Expand All @@ -59,7 +63,18 @@ def create_store(self, store_name: str) -> StoreDocument:
Returns:
a :class:`fiftyone.store.models.StoreDocument`
"""
return self._repo.create_store(store_name)
return self._repo.create_store(store_name, metadata)

def get_store(self, store_name: str) -> StoreDocument:
"""Gets the specified store for the current context.

Args:
store_name: the name of the store

Returns:
a :class:`fiftyone.store.models.StoreDocument`
"""
return self._repo.get_store(store_name)

def list_stores(self) -> list[str]:
"""Lists all stores for the current context.
Expand Down Expand Up @@ -116,6 +131,15 @@ def set_key(
"""
return self._repo.set_key(store_name, key, value, ttl=ttl)

def has_key(self, store_name: str, key: str) -> bool:
"""Determines whether the specified key exists in the specified store.

Args:
store_name: the name of the store
key: the key to check
"""
return self._repo.has_key(store_name, key)

def get_key(self, store_name: str, key: str) -> KeyDocument:
"""Retrieves the value of a key from the specified store.

Expand Down Expand Up @@ -208,3 +232,15 @@ def count_stores_global(self) -> int:
the number of stores
"""
return self._repo.count_stores_global()

def delete_store_global(self, store_name) -> int:
"""Deletes the specified store across all datasets and the global
context.

Args:
store_name: the name of the store

Returns:
the number of stores deleted
"""
return self._repo.delete_store_global(store_name)
5 changes: 4 additions & 1 deletion fiftyone/operators/store/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ def get_ttl(self, key: str) -> Optional[int]:
Returns:
the TTL in seconds, or None if the key does not have a TTL
"""
return self._store_service.get_ttl(self.store_name, key)
key_doc = self._store_service.get_key(self.store_name, key)
if key_doc is None:
return None
return key_doc.ttl

def list_keys(self) -> list[str]:
"""Lists all keys in the store.
Expand Down
19 changes: 19 additions & 0 deletions tests/unittests/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import unittest

import fiftyone as fo
import fiftyone.core.odm as foo


def drop_datasets(func):
Expand Down Expand Up @@ -43,6 +44,24 @@ async def wrapper(*args, **kwargs):
return wrapper


def drop_collection(collection_name):
"""Decorator that drops a collection from the database before and after running a test."""

def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
db = foo.get_db_conn()
db.drop_collection(collection_name)
try:
return func(*args, **kwargs)
finally:
db.drop_collection(collection_name)

return wrapper

return decorator
ritch marked this conversation as resolved.
Show resolved Hide resolved


def skip_windows(func):
"""Decorator that skips a test when running on Windows."""

Expand Down
Loading
Loading