Skip to content

Commit

Permalink
Database (performance) enhancements (#1584)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelveldt authored Aug 20, 2024
1 parent 0a048b4 commit 4943fca
Show file tree
Hide file tree
Showing 23 changed files with 709 additions and 302 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"request": "launch",
"module": "music_assistant",
"justMyCode": false,
"args": ["--log-level", "info"],
"args": ["--log-level", "debug"],
"env": { "PYTHONDEVMODE": "1" }
},
{
Expand Down
9 changes: 7 additions & 2 deletions music_assistant/client/music.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,15 @@ async def remove_item_from_library(
library_item_id=library_item_id,
)

async def add_item_to_library(self, item: str | MediaItemType) -> MediaItemType:
async def add_item_to_library(
self, item: str | MediaItemType, overwrite_existing: bool = False
) -> MediaItemType:
"""Add item (uri or mediaitem) to the library."""
return cast(
MediaItemType, await self.client.send_command("music/library/add_item", item=item)
MediaItemType,
await self.client.send_command(
"music/library/add_item", item=item, overwrite_existing=overwrite_existing
),
)

async def refresh_item(
Expand Down
17 changes: 16 additions & 1 deletion music_assistant/common/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

import contextlib
from enum import EnumType, StrEnum
from enum import EnumType, IntEnum, StrEnum


class MediaTypeMeta(EnumType):
Expand Down Expand Up @@ -432,3 +432,18 @@ class StreamType(StrEnum):
ICY = "icy" # http stream with icy metadata
LOCAL_FILE = "local_file"
CUSTOM = "custom"


class CacheCategory(IntEnum):
"""Enum with predefined cache categories."""

DEFAULT = 0
MUSIC_SEARCH = 1
MUSIC_ALBUM_TRACKS = 2
MUSIC_ARTIST_TRACKS = 3
MUSIC_ARTIST_ALBUMS = 4
MUSIC_PLAYLIST_TRACKS = 5
MUSIC_PROVIDER_ITEM = 6
PLAYER_QUEUE_STATE = 7
MEDIA_INFO = 8
LIBRARY_ITEMS = 9
159 changes: 127 additions & 32 deletions music_assistant/server/controllers/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.cache")
CONF_CLEAR_CACHE = "clear_cache"
DB_SCHEMA_VERSION = 4
DB_SCHEMA_VERSION = 5


class CacheController(CoreController):
Expand Down Expand Up @@ -76,83 +76,129 @@ async def close(self) -> None:
"""Cleanup on exit."""
await self.database.close()

async def get(self, cache_key: str, checksum: str | None = None, default=None):
async def get(
self,
key: str,
checksum: str | None = None,
default=None,
category: int = 0,
base_key: str = "",
) -> Any:
"""Get object from cache and return the results.
cache_key: the (unique) name of the cache object as reference
checksum: optional argument to check if the checksum in the
cacheobject matches the checksum provided
category: optional category to group cache objects
base_key: optional base key to group cache objects
"""
if not cache_key:
if not key:
return None
cur_time = int(time.time())
if checksum is not None and not isinstance(checksum, str):
checksum = str(checksum)

# try memory cache first
cache_data = self._mem_cache.get(cache_key)
memory_key = f"{category}/{base_key}/{key}"
cache_data = self._mem_cache.get(memory_key)
if cache_data and (not checksum or cache_data[1] == checksum) and cache_data[2] >= cur_time:
return cache_data[0]
# fall back to db cache
if (db_row := await self.database.get_row(DB_TABLE_CACHE, {"key": cache_key})) and (
not checksum or db_row["checksum"] == checksum and db_row["expires"] >= cur_time
):
if (
db_row := await self.database.get_row(
DB_TABLE_CACHE, {"category": category, "base_key": base_key, "sub_key": key}
)
) and (not checksum or db_row["checksum"] == checksum and db_row["expires"] >= cur_time):
try:
data = await asyncio.to_thread(json_loads, db_row["data"])
except Exception as exc: # pylint: disable=broad-except
LOGGER.error(
"Error parsing cache data for %s: %s",
cache_key,
memory_key,
str(exc),
exc_info=exc if self.logger.isEnabledFor(10) else None,
)
else:
# also store in memory cache for faster access
self._mem_cache[cache_key] = (
self._mem_cache[memory_key] = (
data,
db_row["checksum"],
db_row["expires"],
)
return data
return default

async def set(self, cache_key, data, checksum="", expiration=(86400 * 30)) -> None:
async def set(
self, key, data, checksum="", expiration=(86400 * 30), category: int = 0, base_key: str = ""
) -> None:
"""Set data in cache."""
if not cache_key:
if not key:
return
if checksum is not None and not isinstance(checksum, str):
checksum = str(checksum)
expires = int(time.time() + expiration)
self._mem_cache[cache_key] = (data, checksum, expires)
memory_key = f"{category}/{base_key}/{key}"
self._mem_cache[memory_key] = (data, checksum, expires)
if (expires - time.time()) < 3600 * 4:
# do not cache items in db with short expiration
return
data = await asyncio.to_thread(json_dumps, data)
await self.database.insert(
DB_TABLE_CACHE,
{"key": cache_key, "expires": expires, "checksum": checksum, "data": data},
{
"category": category,
"base_key": base_key,
"sub_key": key,
"expires": expires,
"checksum": checksum,
"data": data,
},
allow_replace=True,
)

async def delete(self, cache_key) -> None:
async def delete(
self, key: str | None, category: int | None = None, base_key: str | None = None
) -> None:
"""Delete data from cache."""
self._mem_cache.pop(cache_key, None)
await self.database.delete(DB_TABLE_CACHE, {"key": cache_key})

async def clear(self, key_filter: str | None = None) -> None:
match: dict[str, str | int] = {}
if key is not None:
match["sub_key"] = key
if category is not None:
match["category"] = category
if base_key is not None:
match["base_key"] = base_key
if key is not None and category is not None and base_key is not None:
self._mem_cache.pop(f"{category}/{base_key}/{key}", None)
else:
self._mem_cache.clear()
await self.database.delete(DB_TABLE_CACHE, match)

async def clear(
self,
key_filter: str | None = None,
category: int | None = None,
base_key_filter: str | None = None,
) -> None:
"""Clear all/partial items from cache."""
self._mem_cache = {}
self._mem_cache.clear()
self.logger.info("Clearing database...")
query = f"key LIKE '%{key_filter}%' or data LIKE '%{key_filter}%'" if key_filter else None
query_parts: list[str] = []
if category is not None:
query_parts.append(f"category = {category}")
if base_key_filter is not None:
query_parts.append(f"base_key LIKE '%{base_key_filter}%'")
if key_filter is not None:
query_parts.append(f"sub_key LIKE '%{key_filter}%'")
query = "WHERE " + " AND ".join(query_parts) if query_parts else None
await self.database.delete(DB_TABLE_CACHE, query=query)
await self.database.vacuum()
self.logger.info("Clearing database DONE")

async def auto_cleanup(self) -> None:
"""Run scheduled auto cleanup task."""
self.logger.debug("Running automatic cleanup...")
# for now we simply reset the memory cache
self._mem_cache = {}
# simply reset the memory cache
self._mem_cache.clear()
cur_timestamp = int(time.time())
cleaned_records = 0
for db_row in await self.database.get_rows(DB_TABLE_CACHE):
Expand Down Expand Up @@ -202,6 +248,15 @@ async def _setup_database(self) -> None:
DB_TABLE_SETTINGS,
{"key": "version", "value": str(DB_SCHEMA_VERSION), "type": "str"},
)
await self.__create_database_indexes()
# compact db
self.logger.debug("Compacting database...")
try:
await self.database.vacuum()
except Exception as err:
self.logger.warning("Database vacuum failed: %s", str(err))
else:
self.logger.debug("Compacting database done")

async def __create_database_tables(self) -> None:
"""Create database table(s)."""
Expand All @@ -214,13 +269,40 @@ async def __create_database_tables(self) -> None:
)
await self.database.execute(
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_CACHE}(
key TEXT UNIQUE NOT NULL, expires INTEGER NOT NULL,
data TEXT, checksum TEXT NULL)"""
[id] INTEGER PRIMARY KEY AUTOINCREMENT,
[category] INTEGER NOT NULL DEFAULT 0,
[base_key] TEXT NOT NULL,
[sub_key] TEXT NOT NULL,
[expires] INTEGER NOT NULL,
[data] TEXT,
[checksum] TEXT NULL,
UNIQUE(category, base_key, sub_key)
)"""
)

# create indexes
await self.database.commit()

async def __create_database_indexes(self) -> None:
"""Create database indexes."""
await self.database.execute(
f"CREATE UNIQUE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_key_idx on {DB_TABLE_CACHE}(key);"
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_category_idx "
f"ON {DB_TABLE_CACHE}(category);"
)
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_base_key_idx "
f"ON {DB_TABLE_CACHE}(base_key);"
)
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_sub_key_idx "
f"ON {DB_TABLE_CACHE}(sub_key);"
)
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_category_base_key_idx "
f"ON {DB_TABLE_CACHE}(category,base_key);"
)
await self.database.execute(
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_category_base_key_sub_key_idx "
f"ON {DB_TABLE_CACHE}(category,base_key,sub_key);"
)
await self.database.commit()

Expand All @@ -237,6 +319,7 @@ def __schedule_cleanup_task(self) -> None:

def use_cache(
expiration: int = 86400 * 30,
category: int = 0,
) -> Callable[[Callable[Param, RetType]], Callable[Param, RetType]]:
"""Return decorator that can be used to cache a method's result."""

Expand All @@ -245,23 +328,31 @@ def wrapper(func: Callable[Param, RetType]) -> Callable[Param, RetType]:
async def wrapped(*args: Param.args, **kwargs: Param.kwargs):
method_class = args[0]
method_class_name = method_class.__class__.__name__
cache_key_parts = [method_class_name, func.__name__]
cache_base_key = f"{method_class_name}.{func.__name__}"
cache_sub_key_parts = []
skip_cache = kwargs.pop("skip_cache", False)
cache_checksum = kwargs.pop("cache_checksum", "")
if len(args) > 1:
cache_key_parts += args[1:]
cache_sub_key_parts += args[1:]
for key in sorted(kwargs.keys()):
cache_key_parts.append(f"{key}{kwargs[key]}")
cache_key = ".".join(cache_key_parts)
cache_sub_key_parts.append(f"{key}{kwargs[key]}")
cache_sub_key = ".".join(cache_sub_key_parts)

cachedata = await method_class.cache.get(cache_key, checksum=cache_checksum)
cachedata = await method_class.cache.get(
cache_sub_key, checksum=cache_checksum, category=category, base_key=cache_base_key
)

if not skip_cache and cachedata is not None:
return cachedata
result = await func(*args, **kwargs)
asyncio.create_task(
method_class.cache.set(
cache_key, result, expiration=expiration, checksum=cache_checksum
cache_sub_key,
result,
expiration=expiration,
checksum=cache_checksum,
category=category,
base_key=cache_base_key,
)
)
return result
Expand Down Expand Up @@ -316,3 +407,7 @@ def __iter__(self) -> Iterator:
def __len__(self) -> int:
"""Return length."""
return len(self.d)

def clear(self) -> None:
"""Clear cache."""
self.d.clear()
Loading

0 comments on commit 4943fca

Please sign in to comment.