From 0edff2628d082f87a8fed979340fde37bc79e3cc Mon Sep 17 00:00:00 2001 From: quodrumglas Date: Tue, 13 Feb 2024 11:14:38 +0000 Subject: [PATCH] New Model Mapping --- mopidy_tidal/cache.py | 40 +++ mopidy_tidal/full_models_mappers.py | 91 ----- mopidy_tidal/helpers.py | 11 +- mopidy_tidal/library.py | 445 ++++-------------------- mopidy_tidal/lru_cache.py | 206 ----------- mopidy_tidal/models.py | 517 ++++++++++++++++++++++++++++ mopidy_tidal/playback.py | 33 +- mopidy_tidal/playlists.py | 435 ++++++++--------------- mopidy_tidal/ref_models_mappers.py | 141 -------- mopidy_tidal/search.py | 274 +++++---------- mopidy_tidal/uri.py | 93 +++++ mopidy_tidal/utils.py | 4 - mopidy_tidal/workers.py | 100 +++--- pyproject.toml | 1 + 14 files changed, 995 insertions(+), 1396 deletions(-) create mode 100755 mopidy_tidal/cache.py delete mode 100755 mopidy_tidal/full_models_mappers.py delete mode 100755 mopidy_tidal/lru_cache.py create mode 100644 mopidy_tidal/models.py delete mode 100755 mopidy_tidal/ref_models_mappers.py create mode 100644 mopidy_tidal/uri.py diff --git a/mopidy_tidal/cache.py b/mopidy_tidal/cache.py new file mode 100755 index 0000000..e35c19b --- /dev/null +++ b/mopidy_tidal/cache.py @@ -0,0 +1,40 @@ +from functools import wraps +from cachetools import LRUCache, cached +from cachetools.keys import hashkey + + +_by_uri_cache = LRUCache(maxsize=16*1024) +_items_cache = LRUCache(maxsize=16*1024) +_futures_cache = LRUCache(maxsize=16*1024) + +cached_by_uri = cached( + _by_uri_cache, + key=lambda *args, uri, **kwargs: hash(uri), +) +cached_items = cached( + _items_cache, + key=lambda item, *args, **kwargs: hashkey(item.uri, item.last_modified), +) +cached_future = cached( + _futures_cache, + key=lambda *args, uri, **kwargs: hash(uri), +) + + +def cache_by_uri(_callable): + @wraps(_callable) + def wrapper(*args, **kwargs): + item = _callable(*args, **kwargs) + _by_uri_cache[hash(item.ref.uri)] = item + return item + return wrapper + + +def cache_future(_callable): + @wraps(_callable) + def wrapper(*args, **kwargs): + item = _callable(*args, **kwargs) + if item: + _futures_cache[hash(item.ref.uri)] = item + return item + return wrapper diff --git a/mopidy_tidal/full_models_mappers.py b/mopidy_tidal/full_models_mappers.py deleted file mode 100755 index d73e2ea..0000000 --- a/mopidy_tidal/full_models_mappers.py +++ /dev/null @@ -1,91 +0,0 @@ -from __future__ import unicode_literals - -import logging - -from mopidy.models import Album, Artist, Playlist, Track - -from mopidy_tidal.helpers import to_timestamp - -logger = logging.getLogger(__name__) - - -def _get_release_date(obj): - d = None - for attr in ("release_date", "tidal_release_date"): - d = getattr(obj, attr, None) - if d: - break - - if d: - return str(d.year) - - -def create_mopidy_artists(tidal_artists): - return [create_mopidy_artist(a) for a in tidal_artists] - - -def create_mopidy_artist(tidal_artist): - if tidal_artist is None: - return None - - return Artist(uri="tidal:artist:" + str(tidal_artist.id), name=tidal_artist.name) - - -def create_mopidy_albums(tidal_albums): - return [create_mopidy_album(a, None) for a in tidal_albums] - - -def create_mopidy_album(tidal_album, artist): - if artist is None: - artist = create_mopidy_artist(tidal_album.artist) - - return Album( - uri="tidal:album:" + str(tidal_album.id), - name=tidal_album.name, - artists=[artist], - date=_get_release_date(tidal_album), - ) - - -def create_mopidy_tracks(tidal_tracks): - return [create_mopidy_track(None, None, t) for t in tidal_tracks] - - -def create_mopidy_track(artist, album, tidal_track): - uri = "tidal:track:{0}:{1}:{2}".format( - tidal_track.artist.id, tidal_track.album.id, tidal_track.id - ) - if artist is None: - artist = create_mopidy_artist(tidal_track.artist) - if album is None: - album = create_mopidy_album(tidal_track.album, artist) - - track_len = tidal_track.duration * 1000 - return Track( - uri=uri, - name=tidal_track.full_name, - track_no=tidal_track.track_num, - artists=[artist], - album=album, - length=track_len, - date=_get_release_date(tidal_track), - # Different attribute name for disc_num on tidalapi >= 0.7.0 - disc_no=getattr(tidal_track, "disc_num", getattr(tidal_track, "volume_num")), - ) - - -def create_mopidy_playlist(tidal_playlist, tidal_tracks): - return Playlist( - uri=f"tidal:playlist:{tidal_playlist.id}", - name=tidal_playlist.name, - tracks=tidal_tracks, - last_modified=to_timestamp(tidal_playlist.last_updated), - ) - - -def create_mopidy_mix_playlist(tidal_mix): - return Playlist( - uri=f"tidal:mix:{tidal_mix.id}", - name=f"{tidal_mix.title} ({tidal_mix.sub_title})", - tracks=create_mopidy_tracks(tidal_mix.items()), - ) diff --git a/mopidy_tidal/helpers.py b/mopidy_tidal/helpers.py index 5bed914..d2d14a2 100644 --- a/mopidy_tidal/helpers.py +++ b/mopidy_tidal/helpers.py @@ -5,7 +5,16 @@ def to_timestamp(dt): if not dt: return 0 if isinstance(dt, str): - dt = datetime.datetime.fromisoformat(dt) + if dt.lower() == "today": + dt = datetime.datetime.combine( + datetime.datetime.now().date(), + datetime.time.min + ).timestamp() + else: + dt = datetime.datetime.fromisoformat(dt) if isinstance(dt, datetime.datetime): dt = dt.timestamp() return int(dt) + +def return_none(*args, **kwargs): + return None diff --git a/mopidy_tidal/library.py b/mopidy_tidal/library.py index 72e55e0..5abcd94 100755 --- a/mopidy_tidal/library.py +++ b/mopidy_tidal/library.py @@ -1,147 +1,29 @@ from __future__ import unicode_literals import logging -from concurrent.futures import ThreadPoolExecutor -from typing import TYPE_CHECKING, List, Optional, Tuple, Union -from mopidy import backend, models -from mopidy.models import Album, Artist, Image, Playlist, Ref, SearchResult, Track -from requests.exceptions import HTTPError +from mopidy import backend +from mopidy.models import Ref, SearchResult -from mopidy_tidal import full_models_mappers, ref_models_mappers -from mopidy_tidal.login_hack import login_hack -from mopidy_tidal.lru_cache import LruCache -from mopidy_tidal.playlists import PlaylistMetadataCache +from mopidy_tidal.models import lookup_uri, model_factory_map +from mopidy_tidal.search import tidal_search from mopidy_tidal.utils import apply_watermark -from mopidy_tidal.workers import get_items - -if TYPE_CHECKING: # pragma: no cover - from mopidy_tidal.backend import TidalBackend +from mopidy_tidal.uri import URI, URIType logger = logging.getLogger(__name__) -class ImagesGetter: - def __init__(self, session): - self._session = session - self._image_cache = LruCache(directory="image") - - @staticmethod - def _log_image_not_found(obj): - logger.debug( - 'No images available for %s "%s"', - type(obj).__name__, - getattr(obj, "name", getattr(obj, "title", getattr(obj, "id"))), - ) - - @classmethod - def _get_image_uri(cls, obj): - method = None - - if hasattr(obj, "image"): - # Handle artists with missing images - if hasattr(obj, "picture") and getattr(obj, "picture", None) is None: - cls._log_image_not_found(obj) - return - - method = obj.image - else: - cls._log_image_not_found(obj) - return - - dimensions = (750, 640, 480) - for dim in dimensions: - args = (dim,) - try: - return method(*args) - except ValueError: - pass - - cls._log_image_not_found(obj) - - def _get_api_getter(self, item_type: str): - return getattr(self._session, item_type, None) - - def _get_images(self, uri) -> List[Image]: - assert uri.startswith("tidal:"), f"Invalid TIDAL URI: {uri}" - - parts = uri.split(":") - item_type = parts[1] - if item_type == "track": - # For tracks, retrieve the artwork of the associated album - item_type = "album" - item_id = parts[3] - uri = ":".join([parts[0], "album", parts[3]]) - elif item_type == "album": - item_id = parts[2] - elif item_type == "playlist": - item_id = parts[2] - elif item_type == "artist": - item_id = parts[2] - elif item_type == "mix": - item_id = parts[2] - else: - # uri has no image associated to it (eg. tidal:mood tidal:genres etc.) - return [] - - if uri in self._image_cache: - # Cache hit - return self._image_cache[uri] - - logger.debug("Retrieving %r from the API", uri) - getter = self._get_api_getter(item_type) - if not getter: - logger.warning("The API item type %s has no session getters", item_type) - return [] - - item = getter(item_id) - if not item: - logger.debug("%r is not available on the backend", uri) - return [] - - img_uri = self._get_image_uri(item) - if not img_uri: - logger.debug("%r has no associated images", uri) - return [] - - logger.debug("Image URL for %r: %r", uri, img_uri) - return [Image(uri=img_uri, width=320, height=320)] - - def __call__(self, uri: str) -> Tuple[str, List[Image]]: - try: - return uri, self._get_images(uri) - except (AssertionError, AttributeError, HTTPError) as err: - logger.error("%s when processing URI %r: %s", type(err), uri, err) - return uri, [] - - def cache_update(self, images): - self._image_cache.update(images) - - class TidalLibraryProvider(backend.LibraryProvider): - root_directory = models.Ref.directory(uri="tidal:directory", name="Tidal") - backend: "TidalBackend" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._artist_cache = LruCache() - self._album_cache = LruCache() - self._track_cache = LruCache() - self._playlist_cache = PlaylistMetadataCache() - - @property - def _session(self): - return self.backend.session + root_directory = Ref.directory(uri=str(URI(URIType.DIRECTORY)), name="Tidal") - @login_hack(passthrough=True) - def get_distinct(self, field, query=None) -> set[str]: + def get_distinct(self, field, query=None): from mopidy_tidal.search import tidal_search - logger.debug("Browsing distinct %s with query %r", field, query) - session = self._session + logger.info(f"Browsing distinct {field!s} with query {query!r}") + session = self.backend.session if not query: # library root - if field in {"artist", "albumartist"}: + if field == "artist" or field == "albumartist": return [ apply_watermark(a.name) for a in session.user.favorites.artists() ] @@ -149,7 +31,7 @@ def get_distinct(self, field, query=None) -> set[str]: return [ apply_watermark(a.name) for a in session.user.favorites.albums() ] - elif field in {"track", "track_name"}: + elif field == "track": return [ apply_watermark(t.name) for t in session.user.favorites.tracks() ] @@ -158,7 +40,7 @@ def get_distinct(self, field, query=None) -> set[str]: return [ apply_watermark(a.name) for a in session.user.favorites.artists() ] - elif field in {"album", "albumartist"}: + elif field == "album" or field == "albumartist": artists, _, _ = tidal_search(session, query=query, exact=True) if len(artists) > 0: artist = artists[0] @@ -167,272 +49,79 @@ def get_distinct(self, field, query=None) -> set[str]: apply_watermark(a.name) for a in self._get_artist_albums(session, artist_id) ] - elif field in {"track", "track_name"}: + elif field == "track": return [ apply_watermark(t.name) for t in session.user.favorites.tracks() ] pass + logger.warning(f"Browse distinct failed for: {field}") return [] - @login_hack - def browse(self, uri) -> list[Ref]: - logger.info("Browsing uri %s", uri) - if not uri or not uri.startswith("tidal:"): + def browse(self, uri): + logger.debug(f"TidalLibraryProvider.browse {uri}") + uri = URI.from_string(uri) + if not uri: return [] - session = self._session + session = self.backend.session # summaries - if uri == self.root_directory.uri: - return ref_models_mappers.create_root() - - elif uri == "tidal:my_artists": - return ref_models_mappers.create_artists( - get_items(session.user.favorites.artists) - ) - elif uri == "tidal:my_albums": - return ref_models_mappers.create_albums( - get_items(session.user.favorites.albums) - ) - elif uri == "tidal:my_playlists": - return self.backend.playlists.as_list() - elif uri == "tidal:my_tracks": - return ref_models_mappers.create_tracks( - get_items(session.user.favorites.tracks) - ) - elif uri == "tidal:home": - return ref_models_mappers.create_mixed_directory( - [m for m in session.home()] - ) - elif uri == "tidal:for_you": - return ref_models_mappers.create_mixed_directory( - [m for m in session.for_you()] - ) - elif uri == "tidal:explore": - return ref_models_mappers.create_mixed_directory( - [m for m in session.explore()] - ) - elif uri == "tidal:moods": - return ref_models_mappers.create_moods(session.moods()) - elif uri == "tidal:mixes": - return ref_models_mappers.create_mixes([m for m in session.mixes()]) - elif uri == "tidal:genres": - return ref_models_mappers.create_genres(session.genre.get_genres()) + summaries = { + "genres": session.genres, + "moods": session.moods, + "mixes": session.mixes, + "my_artists": session.user.favorites.artists, + "my_albums": session.user.favorites.albums, + "my_playlists": session.user.favorites.playlists, + "my_tracks": session.user.favorites.tracks, + # "my_mixes": session.user.favorites.mixes_and_radio, + "playlists": session.user.playlists, + } + + if uri.type == URIType.DIRECTORY: + return [ + Ref.directory(uri=str(URI(summary)), name=summary.replace("_", " ").title()) + for summary in summaries + ] + + summary = summaries.get(uri.type) + if summary: + return [m.ref for m in model_factory_map(summary())] # details - parts = uri.split(":") - nr_of_parts = len(parts) - - if nr_of_parts == 3 and parts[1] == "album": - return ref_models_mappers.create_tracks( - self._get_album_tracks(session, parts[2]) - ) - - if nr_of_parts == 3 and parts[1] == "artist": - top_10_tracks = ref_models_mappers.create_tracks( - self._get_artist_top_tracks(session, parts[2])[:10] - ) - - albums = ref_models_mappers.create_albums( - self._get_artist_albums(session, parts[2]) - ) - - return albums + top_10_tracks - - if nr_of_parts == 3 and parts[1] == "playlist": - return ref_models_mappers.create_tracks( - self._get_playlist_tracks(session, parts[2]) - ) - - if nr_of_parts == 3 and parts[1] == "mood": - return ref_models_mappers.create_playlists( - self._get_mood_items(session, parts[2]) - ) - - if nr_of_parts == 3 and parts[1] == "genre": - return ref_models_mappers.create_playlists( - self._get_genre_items(session, parts[2]) - ) - - if nr_of_parts == 3 and parts[1] == "mix": - return ref_models_mappers.create_tracks( - self._get_mix_tracks(session, parts[2]) - ) - - if nr_of_parts == 3 and parts[1] == "page": - return ref_models_mappers.create_mixed_directory(session.page.get(parts[2])) - - logger.debug("Unknown uri for browse request: %s", uri) - return [] - - @login_hack - def search(self, query=None, uris=None, exact=False) -> Optional[SearchResult]: - from mopidy_tidal.search import tidal_search try: - artists, albums, tracks = tidal_search( - self._session, query=query, exact=exact - ) - return SearchResult(artists=artists, albums=albums, tracks=tracks) - except Exception as ex: - logger.info("EX") - logger.info("%r", ex) - - @login_hack - def get_images(self, uris) -> dict[str, list[Image]]: - logger.info("Searching Tidal for images for %r" % uris) - images_getter = ImagesGetter(self._session) - - with ThreadPoolExecutor(4, thread_name_prefix="mopidy-tidal-images-") as pool: - pool_res = pool.map(images_getter, uris) + model = lookup_uri(session, uri) + except ValueError: + logger.warning("Browse request failed for: %s", uri) + return [] + else: + if model: + return [item.ref for item in model.items()] + logger.warning("Browse request failed for: %s", uri) + return [] - images = {uri: item_images for uri, item_images in pool_res if item_images} - images_getter.cache_update(images) + def search(self, query=None, uris=None, exact=False): + total = self.backend.get_config("search_result_count") + return SearchResult(**tidal_search( + self.backend.session, query=query, total=total, exact=exact + )) + + def get_images(self, uris): + images = { + uri: lookup_uri(self.backend.session, uri).images + for uri in uris + } return images - @login_hack - def lookup(self, uris=None) -> list[Track]: - if isinstance(uris, str): + def lookup(self, uris): + logger.debug(f"TidalLibraryProvider.lookup({uris!r})") + if isinstance(uris, str) or not hasattr(uris, "__iter__"): uris = [uris] - if not hasattr(uris, "__iter__"): - uris = [uris] - - tracks = [] - cache_updates = {} - - for uri in uris or []: - data = [] - try: - parts = uri.split(":") - item_type = parts[1] - cache_name = f"_{parts[1]}_cache" - cache_miss = True - - try: - data = getattr(self, cache_name)[uri] - cache_miss = not bool(data) - except (AttributeError, KeyError): - pass - - if cache_miss: - try: - lookup = getattr(self, f"_lookup_{parts[1]}") - except AttributeError: - continue - - data = cache_data = lookup(self._session, parts) - cache_updates[cache_name] = cache_updates.get(cache_name, {}) - if item_type == "playlist": - # Playlists should be persisted on the cache as objects, - # not as lists of tracks. Therefore, _lookup_playlist - # returns a tuple that we need to unpack - data, cache_data = data - - cache_updates[cache_name][uri] = cache_data - - if item_type == "playlist" and not cache_miss: - tracks += data.tracks - else: - tracks += data if hasattr(data, "__iter__") else [data] - except HTTPError as err: - logger.error("%s when processing URI %r: %s", type(err), uri, err) - - for cache_name, new_data in cache_updates.items(): - getattr(self, cache_name).update(new_data) - - self._track_cache.update({track.uri: track for track in tracks}) - logger.info("Returning %d tracks", len(tracks)) - return tracks - - @classmethod - def _get_playlist_tracks(cls, session, playlist_id): - pl = session.playlist(playlist_id) - getter_args = tuple() - return get_items(pl.tracks, *getter_args) - - @staticmethod - def _get_genre_items(session, genre_id): - from tidalapi.playlist import Playlist - - filtered_genres = [g for g in session.genre.get_genres() if genre_id == g.path] - if filtered_genres: - return filtered_genres[0].items(Playlist) - return [] - - @staticmethod - def _get_mood_items(session, mood_id): - filtered_moods = [ - m for m in session.moods() if mood_id == m.api_path.split("/")[-1] + return [ + t.full + for uri in uris + for t in lookup_uri(self.backend.session, uri).tracks() ] - - if filtered_moods: - mood = filtered_moods[0].get() - return [p for p in mood] - return [] - - @staticmethod - def _get_mix_tracks(session, mix_id): - filtered_mixes = [m for m in session.mixes() if mix_id == m.id] - if filtered_mixes: - return filtered_mixes[0].items() - return [] - - def _lookup_playlist(self, session, parts): - playlist_id = parts[2] - tidal_playlist = session.playlist(playlist_id) - tidal_tracks = self._get_playlist_tracks(session, playlist_id) - pl_tracks = full_models_mappers.create_mopidy_tracks(tidal_tracks) - pl = full_models_mappers.create_mopidy_playlist(tidal_playlist, pl_tracks) - # We need both the list of tracks and the mapped playlist object for - # caching purposes - return pl_tracks, pl - - @staticmethod - def _get_artist_albums(session, artist_id): - artist = session.artist(artist_id) - if not artist: - logger.warning("No such artist: %s", artist_id) - return [] - - return artist.get_albums() - - @staticmethod - def _get_album_tracks(session, album_id): - album = session.album(album_id) - if not album: - logger.warning("No such album: %s", album_id) - return [] - - return album.tracks() - - def _lookup_track(self, session, parts): - if len(parts) == 3: # Track in format `tidal:track:` - track_id = parts[2] - track = session.track(track_id) - album_id = str(track.album.id) - else: # Track in format `tidal:track:::` - album_id = parts[3] - track_id = parts[4] - tracks = self._get_album_tracks(session, album_id) - # We get a spurious coverage error since the next expression should never raise StopIteration - track = next(t for t in tracks if t.id == int(track_id)) # pragma: no cover - artist = full_models_mappers.create_mopidy_artist(track.artist) - album = full_models_mappers.create_mopidy_album(track.album, artist) - return [full_models_mappers.create_mopidy_track(artist, album, track)] - - def _lookup_album(self, session, parts): - album_id = parts[2] - tracks = self._get_album_tracks(session, album_id) - - return full_models_mappers.create_mopidy_tracks(tracks) - - @staticmethod - def _get_artist_top_tracks(session, artist_id): - return session.artist(artist_id).get_top_tracks() - - def _lookup_artist(self, session, parts): - artist_id = parts[2] - tracks = self._get_artist_top_tracks(session, artist_id) - return full_models_mappers.create_mopidy_tracks(tracks) diff --git a/mopidy_tidal/lru_cache.py b/mopidy_tidal/lru_cache.py deleted file mode 100755 index 2449870..0000000 --- a/mopidy_tidal/lru_cache.py +++ /dev/null @@ -1,206 +0,0 @@ -from __future__ import unicode_literals - -import logging -import pickle -from collections import OrderedDict -from pathlib import Path -from typing import Optional - -from mopidy_tidal import Extension, context - -logger = logging.getLogger(__name__) - - -def id_to_cachef(id: str) -> Path: - return Path(id.replace(":", "-") + ".cache") - - -class LruCache(OrderedDict): - def __init__(self, max_size: Optional[int] = 1024, persist=True, directory=""): - """ - :param max_size: Max size of the cache in memory. Set 0 or None for no - limit (default: 1024) - :param persist: Whether the cache should be persisted to disk - (default: True) - :param directory: If `persist=True`, store the cached entries in this - subfolder of the cache directory (default: '') - """ - super().__init__(self) - if max_size: - assert max_size > 0, f"Invalid cache size: {max_size}" - - self._max_size = max_size or 0 - self._cache_dir = Path(Extension.get_cache_dir(context.get_config()), directory) - self._persist = persist - if persist: - self._cache_dir.mkdir(parents=True, exist_ok=True) - - self._check_limit() - - @property - def max_size(self): - return self._max_size - - @property - def persist(self): - return self._persist - - def cache_file(self, key: str, cache_dir: Optional[Path] = None) -> Path: - parts = key.split(":") - assert len(parts) > 2, f"Invalid TIDAL ID: {key}" - _, obj_type, id, *_ = parts - if not cache_dir: - cache_dir = Path(obj_type) - - cache_dir = self._cache_dir / cache_dir / id[:2] - cache_dir.mkdir(parents=True, exist_ok=True) - cache_file = cache_dir / id_to_cachef(key) - legacy_cache_file = cache_dir / f"{key}.cache" - if legacy_cache_file.is_file(): - return legacy_cache_file - - return cache_file - - def _get_from_storage(self, key): - cache_file = self.cache_file(key) - err = KeyError(key) - if not cache_file.is_file(): - # Cache miss on the filesystem - raise err - - # Cache hit on the filesystem - with open(cache_file, "rb") as f: - try: - value = pickle.load(f) - except Exception as e: - # If the cache entry on the filesystem is corrupt, reset it - logger.warning( - "Could not deserialize cache file %s: " "refreshing the entry: %s", - cache_file, - e, - ) - self._reset_stored_entry(key) - raise err - - # Store the filesystem item in memory - if value is not None: - self.__setitem__(key, value, _sync_to_fs=False) - logger.debug(f"Filesystem cache hit for {key}") - return value - - def __getitem__(self, key, *_, **__): - try: - # Cache hit in memory - return super().__getitem__(key) - except KeyError as e: - if not self.persist: - # No persisted storage -> cache miss - raise e - - # Check on the persisted cache - return self._get_from_storage(key) - - def __setitem__(self, key, value, _sync_to_fs=True, *_, **__): - if super().__contains__(key): - del self[key] - - super().__setitem__(key, value) - if self.persist and _sync_to_fs: - cache_file = self.cache_file(key) - with open(cache_file, "wb") as f: - pickle.dump(value, f) - - self._check_limit() - - def __contains__(self, key): - return self.get(key) is not None - - def _reset_stored_entry(self, key): - cache_file = self.cache_file(key) - if cache_file.is_file(): - cache_file.unlink() - - def get(self, key, default=None, *args, **kwargs): - try: - return self.__getitem__(key, *args, **kwargs) - except KeyError: - return default - - def prune(self, *keys): - """ - Delete the specified keys both from memory and disk. - """ - for key in keys: - logger.debug("Pruning key %r from cache %s", key, self.__class__.__name__) - - self._reset_stored_entry(key) - self.pop(key, None) - - def prune_all(self): - """ - Prune all the keys in the cache. - """ - self.prune(*[*self.keys()]) - - def update(self, *args, **kwargs): - super().update(*args, **kwargs) - self._check_limit() - - def _check_limit(self): - if self.max_size: - # delete oldest entries - while len(self) > self.max_size: - self.popitem(last=False) - - -class SearchCache(LruCache): - def __init__(self, func): - super().__init__(persist=False) - self._func = func - - def __call__(self, *args, **kwargs): - key = str(SearchKey(**kwargs)) - cached_result = self.get(key) - logger.info( - "Search cache miss" if cached_result is None else "Search cache hit" - ) - if cached_result is None: - cached_result = self._func(*args, **kwargs) - self[key] = cached_result - - return cached_result - - -class SearchKey(object): - def __init__(self, **kwargs): - fixed_query = self.fix_query(kwargs["query"]) - self._query = tuple(sorted(fixed_query.items())) - self._exact = kwargs["exact"] - self._hash = None - - def __hash__(self): - if self._hash is None: - self._hash = hash(self._exact) - self._hash ^= hash(repr(self._query)) - - return self._hash - - def __str__(self): - return f"tidal:search:{self.__hash__()}" - - def __eq__(self, other): - if not isinstance(other, SearchKey): - return False - - return self._exact == other._exact and self._query == other._query - - @staticmethod - def fix_query(query): - """ - Removes some query parameters that otherwise will lead to a cache miss. - Eg: 'track_no' since we can't query TIDAL for a specific album's track. - :param query: query dictionary - :return: sanitized query dictionary - """ - query.pop("track_no", None) - return query diff --git a/mopidy_tidal/models.py b/mopidy_tidal/models.py new file mode 100644 index 0000000..0acf43a --- /dev/null +++ b/mopidy_tidal/models.py @@ -0,0 +1,517 @@ +__all__ = ( + "Track", + "Album", + "Artist", + "Playlist", + "Mix", + "Page", + "lookup_uri", + "model_factory", + "model_factory_map", +) + +import logging + +import mopidy.models as mm +import tidalapi as tdl + +from mopidy_tidal.cache import cache_by_uri, cached_by_uri, cached_items, cache_future, cached_future +from mopidy_tidal.helpers import to_timestamp, return_none +from mopidy_tidal.uri import URI, URIType +from mopidy_tidal.workers import paginated + +DEFAULT_IMAGE = "https://tidal.com/browse/assets/images/defaultImages/default{0.__class__.__name__}Image.png".format +IMAGE_SIZE = 320 + +logger = logging.getLogger(__name__) + + +class Model: + def __init__(self, *, ref, api, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + self.ref = ref + self.api = api + self._full = None + + @classmethod + def from_api(cls, api): + raise NotImplementedError + + @classmethod + def from_uri(cls, session: tdl.Session, /, *, uri: str): + raise NotImplementedError + + @property + def uri(self): + return self.ref.uri + + @property + def name(self): + return self.ref.name + + @property + def full(self): + if self._full is None: + self._full = self.build() + return self._full + + @property + def last_modified(self): + return to_timestamp("today") + + def build(self): + raise NotImplementedError + + def items(self): + raise NotImplementedError + + def tracks(self): + raise NotImplementedError + + @property + def images(self): + raise NotImplementedError + + +class Track(Model): + artists = [] + album = None + + @classmethod + @cache_by_uri + def from_api(cls, track: tdl.Track): + uri = URI(URIType.TRACK, track.id) + return cls( + ref=mm.Ref.track(uri=str(uri), name=track), + api=track, + artists=[Artist.from_api(artist) for artist in track.artists], + album=Album.from_api(track.album) if track.album else None, + ) + + @classmethod + @cached_by_uri + def from_uri(cls, session: tdl.Session, /, *, uri): + uri = URI.from_string(uri) + if uri.type != URIType.TRACK: + raise ValueError("Not a valid uri for Track: %s", uri) + track = session.track(uri.track) + return cls( + ref=mm.Ref.track(uri=str(uri), name=track), + api=track, + artists=[Artist.from_api(artist) for artist in track.artists], + album=Album.from_api(track.album) if track.album else None, + ) + + def items(self): + raise AttributeError + + def tracks(self): + return [self] + + def radio(self): + return [ + Track.from_api(t) + for t in self.api.radio().items() + if isinstance(t, tdl.Track) + ] + + def build(self): + return mm.Track( + uri=self.uri, + name=self.name, + track_no=self.api.track_num, + artists=[artist.full for artist in self.artists], + album=self.album.full if self.album else None, + length=self.api.duration * 1000, + date=str(self.api.album.year) if self.api.album.year else None, + disc_no=self.api.volume_num, + genre=self.api.audio_quality.value, + comment=' '.join(map(str, self.api.media_metadata_tags)), + ) + + @property + def images(self): + images = [*self.album.images, *(img for artist in self.artists for img in artist.images)] + if all("/defaultImages/" in img.uri for img in images): + images = [mm.Image(uri=DEFAULT_IMAGE(self), width=IMAGE_SIZE, height=IMAGE_SIZE)] + return images + + +class Album(Model): + artists = [] + + @classmethod + def from_api(cls, album: tdl.Album): + uri = URI(URIType.ALBUM, album.id) + return cls( + ref=mm.Ref.album(uri=str(uri), name=album.name), + api=album, + artists=[Artist.from_api(artist) for artist in album.artists], + ) + + @classmethod + def from_uri(cls, session: tdl.Session, uri: str): + uri = URI.from_string(uri) + if uri.type != URIType.ALBUM: + raise ValueError("Not a valid uri for Album: %s", uri) + album = session.album(uri.album) + return cls( + ref=mm.Ref.album(uri=str(uri), name=album.name), + api=album, + artists=[Artist.from_api(artist) for artist in album.artists], + ) + + def build(self): + return mm.Album( + uri=self.uri, + name=self.name, + artists=[artist.full for artist in self.artists], + num_tracks=self.api.num_tracks, + num_discs=self.api.num_volumes, + date=str(self.api.year) if self.api.year else None, + ) + + def items(self): + return [ + Future.from_api(self.api.page, ref_type=mm.Ref.DIRECTORY, title=f"Page: {self.name}"), + *self.tracks() + ] + + def tracks(self): + return [Track.from_api(t) for t in self.api.tracks()] + + @property + def images(self): + image_uri = self.api.image(IMAGE_SIZE) if self.api.cover else DEFAULT_IMAGE(self) + return [mm.Image(uri=image_uri, width=IMAGE_SIZE, height=IMAGE_SIZE)] + + +class Artist(Model): + @classmethod + def from_api(cls, artist: tdl.Artist): + uri = URI(URIType.ARTIST, artist.id) + return cls( + ref=mm.Ref.artist(uri=str(uri), name=artist.name), + api=artist, + ) + + @classmethod + def from_uri(cls, session: tdl.Session, uri: str): + uri = URI.from_string(uri) + if uri.type != URIType.ARTIST: + raise ValueError("Not a valid uri for Artist: %s", uri) + artist = session.artist(uri.artist) + return cls( + ref=mm.Ref.artist(uri=str(uri), name=artist.name), + api=artist, + ) + + def build(self): + return mm.Artist( + uri=self.uri, + name=self.name, + # sortname=self.api.name, + ) + + def items(self): + return [ + Future.from_api(self.api.page, ref_type=mm.Ref.DIRECTORY, title=f"Page: {self.name}"), + Future.from_api(self.api.get_radio, ref_type=mm.Ref.PLAYLIST, title=f"Radio: {self.name}"), + *self.tracks(), + *(Album.from_api(album) for album in self.api.get_albums()), + ] + + def tracks(self, limit=10): + return [Track.from_api(track) for track in self.api.get_top_tracks(limit=limit)] + + @property + def images(self): + image_uri = self.api.image(IMAGE_SIZE) if self.api.picture else DEFAULT_IMAGE(self) + return [mm.Image(uri=image_uri, width=IMAGE_SIZE, height=IMAGE_SIZE)] + + +class Playlist(Model): + @classmethod + def from_api(cls, playlist: tdl.Playlist): + uri = URI(URIType.PLAYLIST, playlist.id) + return cls( + ref=mm.Ref.playlist(uri=str(uri), name=playlist.name), + api=playlist, + ) + + @classmethod + def from_uri(cls, session, uri): + uri = URI.from_string(uri) + if uri.type != URIType.PLAYLIST: + raise ValueError("Not a valid uri for Playlist: %s", uri) + playlist = session.playlist(uri.playlist) + return cls( + ref=mm.Ref.playlist(uri=str(uri), name=playlist.name), + api=playlist, + ) + + @property + def last_modified(self): + return to_timestamp(self.api.last_updated) + + def build(self): + return mm.Playlist( + uri=self.uri, + name=self.name, + tracks=[t.full for t in self.items()], + last_modified=self.last_modified, + ) + + def items(self): + return self.tracks() + + @cached_items + def tracks(self): + return [ + Track.from_api(item) + for page in paginated(self.api.tracks, total=self.api.num_tracks) + for item in page + if isinstance(item, tdl.Track) + ] + + @property + def images(self): + image_uri = self.api.image(IMAGE_SIZE) if self.api.square_picture else DEFAULT_IMAGE(self) + return [mm.Image(uri=image_uri, width=IMAGE_SIZE, height=IMAGE_SIZE)] + + +class PlaylistAsAlbum(Playlist): + def build(self): + return mm.Album( + uri=self.uri, + name=self.name, + artists=[Artist.from_api(artist).full for artist in self.api.promoted_artists or []], + num_tracks=self.api.num_tracks, + num_discs=1, + date=str(self.api.created.year) if self.api.created else None, + ) + + +class Mix(Model): + @classmethod + def from_api(cls, mix: tdl.Mix): + uri = URI(URIType.MIX, mix.id) + return cls( + ref=mm.Ref.playlist(uri=str(uri), name=f"{mix.title} ({mix.sub_title})"), + api=mix, + ) + + @classmethod + def from_uri(cls, session: tdl.Session, /, *, uri: str): + uri = URI.from_string(uri) + if uri.type != URIType.MIX: + raise ValueError("Not a valid uri for Mix: %s", uri) + mix = session.mix(uri.mix) + return cls( + ref=mm.Ref.playlist(uri=str(uri), name=f"{mix.title} ({mix.sub_title})"), + api=mix, + ) + + @property + def last_modified(self): + return to_timestamp(self.api.updated) + + def build(self): + return mm.Playlist( + uri=self.uri, + name=self.name, + tracks=[t.full for t in self.items()], + last_modified=self.last_modified, + ) + + def items(self): + return self.tracks() + + def tracks(self): + return [ + Track.from_api(item) + for item in self.api.items() + if isinstance(item, tdl.Track) + ] + + @property + def images(self): + return None + + +class Page(Model): + api_path = None + + @classmethod + def from_api(cls, page: tdl.Page): + uri = URI(URIType.PAGE) + return cls( + ref=mm.Ref.directory(uri=str(uri), name=page.title), + api=page, + ) + + @classmethod + def from_uri(cls, session, uri: str): + uri = URI.from_string(uri) + if uri.type != URIType.PAGE: + raise ValueError("Not a valid uri for Page: %s", uri) + page = session.page.get(uri.page) + return cls( + ref=mm.Ref.directory(uri=str(uri), name=page.title), + api=page, + api_path=uri.page + ) + + @property + def last_modified(self): + return to_timestamp("today") + + def build(self): + return self.ref + + def items(self): + return list(model_factory_map(self.api)) + + def tracks(self): + raise AttributeError + + @property + def images(self): + return None + + +class PageLink: + def __init__(self, title, api_path): + self.ref = mm.Ref.directory(uri=str(URI(URIType.PAGE, api_path)), name=title) + + @classmethod + def from_api(cls, page_link: tdl.page.PageLink): + return cls(page_link.title, page_link.api_path) + + +class PageItem(Model): + URI_REF_MAP = { + URIType.TRACK: mm.Ref.TRACK, + URIType.ALBUM: mm.Ref.ALBUM, + URIType.ARTIST: mm.Ref.ARTIST, + URIType.PLAYLIST: mm.Ref.PLAYLIST, + URIType.MIX: mm.Ref.PLAYLIST, + URIType.PAGE: mm.Ref.DIRECTORY, + } + + @classmethod + def from_api(cls, item: tdl.page.PageItem): + try: + uri_type = URIType[item.type] + except KeyError: + logger.error(f"Future return type unknown: {item.type!s}") + return None + try: + ref_type = cls.URI_REF_MAP[uri_type] + except KeyError: + logger.error(f"Future return type not supported: {uri_type!s}") + return None + uri = URI(uri_type, item.artifact_id) + ref = mm.Ref(type=ref_type, uri=str(uri), name=item.header) + return cls(ref=ref, api=item) + + def build(self): + return model_factory(self.api.get()) + + +class ItemList(Model): + @classmethod + def from_api(cls, items: list): + return cls( + ref=mm.Ref.playlist(uri=str(URI(URIType.PLAYLIST)), name=None), + api=items + ) + + def items(self): + return list(model_factory_map(self.api)) + + def tracks(self): + return self.items() + + def build(self): + return mm.Playlist( + uri=self.uri, + name=self.name, + tracks=[t.full for t in self.items()], + last_modified=to_timestamp("today"), + ) + + +class Future(Model): + @classmethod + @cache_future + def from_api(cls, future, /, *, ref_type: mm.Ref, title: str): + uri = URI(URIType.FUTURE, str(hash(future))) + return cls( + ref=mm.Ref(type=ref_type, uri=str(uri), name=title), + api=future, + ) + + @classmethod + @cached_future + def from_cache(cls, session: tdl.Session, /, *, uri: str): + return # None if cache decorator fails + + @classmethod + def from_uri(cls, session: tdl.Session, /, *, uri: str): + future = cls.from_cache(session, uri=uri) + if future: + return model_factory(future.api()) + + +def model_factory(api_item): + try: + tdl_api = next(k for k in _model_map.keys() if isinstance(api_item, k)) + except StopIteration: + raise ValueError(f"Not valid value to model: {api_item.__class__.__name__} {api_item!r}") + else: + return _model_map[tdl_api](api_item) + + +def model_factory_map(iterable): + for i in iterable: + try: + model = model_factory(i) + if model: + yield model + except ValueError as e: + logger.error(e) + + +_model_map = { + tdl.Track: Track.from_api, + tdl.Video: return_none, + tdl.Album: Album.from_api, + tdl.Artist: Artist.from_api, + tdl.Playlist: Playlist.from_api, + tdl.Mix: Mix.from_api, + tdl.Page: Page.from_api, + tdl.page.PageLink: PageLink.from_api, + tdl.page.PageItem: PageItem.from_api, + list: ItemList.from_api +} + + +def lookup_uri(session, uri): + uri = str(uri) + model_class = _uri_type_map.get(URI.from_string(uri).type) + if model_class is None: + raise ValueError(f"Not valid value as uri: {uri!s}") + return model_class(session, uri=uri) + + +_uri_type_map = { + URIType.TRACK: Track.from_uri, + URIType.ALBUM: Album.from_uri, + URIType.ARTIST: Artist.from_uri, + URIType.PLAYLIST: Playlist.from_uri, + URIType.MIX: Mix.from_uri, + URIType.PAGE: Page.from_uri, + URIType.FUTURE: Future.from_uri, +} diff --git a/mopidy_tidal/playback.py b/mopidy_tidal/playback.py index 0dbdb57..4f12081 100755 --- a/mopidy_tidal/playback.py +++ b/mopidy_tidal/playback.py @@ -1,38 +1,19 @@ from __future__ import unicode_literals -from typing import TYPE_CHECKING - -from login_hack import speak_login_hack - -if TYPE_CHECKING: # pragma: no cover - from mopidy_tidal.backend import TidalBackend - import logging +from cachetools import TTLCache, cachedmethod from mopidy import backend -from tidalapi import Quality +from mopidy_tidal.uri import URI logger = logging.getLogger(__name__) class TidalPlaybackProvider(backend.PlaybackProvider): - backend: "TidalBackend" - @speak_login_hack - def translate_uri(self, uri): - logger.info("TIDAL uri: %s", uri) - parts = uri.split(":") - track_id = int(parts[4]) - session = self.backend.session - if session.config.quality == Quality.hi_res_lossless: - if "HIRES_LOSSLESS" in session.track(track_id).media_metadata_tags: - logger.info("Playback quality: %s", session.config.quality) - else: - logger.info( - "No HI_RES available for this track; Using playback quality: %s", - "LOSSLESS", - ) + __cache = TTLCache(maxsize=128, ttl=120) - newurl = session.track(track_id).get_url() - logger.info("transformed into %s", newurl) - return newurl + @cachedmethod(lambda slf: slf.__cache) + def translate_uri(self, uri): + logger.debug("TidalPlaybackProvider translate_uri: %s", uri) + return self.backend.session.track(URI.from_string(uri).track).get_url() diff --git a/mopidy_tidal/playlists.py b/mopidy_tidal/playlists.py index 6074df3..91d5ab4 100644 --- a/mopidy_tidal/playlists.py +++ b/mopidy_tidal/playlists.py @@ -1,322 +1,167 @@ from __future__ import unicode_literals -import difflib import logging -import operator -from concurrent.futures import ThreadPoolExecutor -from pathlib import Path -from threading import Event, Timer -from typing import TYPE_CHECKING, Collection, List, Optional, Tuple, Union - -from mopidy import backend -from mopidy.models import Playlist as MopidyPlaylist -from mopidy.models import Ref, Track -from requests import HTTPError -from tidalapi.playlist import Playlist as TidalPlaylist - -from mopidy_tidal import full_models_mappers -from mopidy_tidal.full_models_mappers import create_mopidy_playlist -from mopidy_tidal.helpers import to_timestamp -from mopidy_tidal.login_hack import login_hack -from mopidy_tidal.lru_cache import LruCache -from mopidy_tidal.utils import mock_track -from mopidy_tidal.workers import get_items - -if TYPE_CHECKING: # pragma: no cover - from mopidy_tidal.backend import TidalBackend +from typing import List, Optional + +from cachetools import cached, TTLCache, cachedmethod +from mopidy.backend import PlaylistsProvider +from mopidy.models import Playlist, Ref + +from mopidy_tidal.models import lookup_uri, model_factory_map +from mopidy_tidal.uri import URI, URIType +from mopidy_tidal.workers import sorted_threaded logger = logging.getLogger(__name__) -class PlaylistCache(LruCache): - def __getitem__( - self, key: Union[str, TidalPlaylist], *args, **kwargs - ) -> MopidyPlaylist: - uri = key.id if isinstance(key, TidalPlaylist) else key - assert uri - uri = f"tidal:playlist:{uri}" if not uri.startswith("tidal:playlist:") else uri +class TidalPlaylistsProvider(PlaylistsProvider): + NEW_PLAYLIST_URI = f"{URI(URIType.PLAYLIST, 'new')}" - playlist = super().__getitem__(uri, *args, **kwargs) - if ( - playlist - and isinstance(key, TidalPlaylist) - and to_timestamp(key.last_updated) > to_timestamp(playlist.last_modified) - ): - # The playlist has been updated since last time: - # we should refresh the associated cache entry - logger.info('The playlist "%s" has been updated: refresh forced', key.name) + def __init__(self, *args, playlist_cache_ttl, **kwargs): + super().__init__(*args, **kwargs) + self.__as_list_cache = TTLCache(maxsize=1, ttl=playlist_cache_ttl) - raise KeyError(uri) + @cachedmethod(lambda self: self.__as_list_cache) + def as_list(self) -> List[Ref]: + """ + Get a list of the currently available playlists. - return playlist + Returns a list of :class:`~mopidy.models.Ref` objects referring to the + playlists. In other words, no information about the playlists' content + is given. + :rtype: list of :class:`mopidy.models.Ref` -class PlaylistMetadataCache(PlaylistCache): - def cache_file(self, key: str) -> Path: - return super().cache_file(key, Path("playlist_metadata")) + .. versionadded:: 1.0 + """ + logger.debug(f"TidalPlaylistsProvider.as_list() ttl: {self.as_list.cache(self).ttl}") + results = sorted_threaded( + self.backend.session.user.playlist_and_favorite_playlists, + ) + return [m.ref.replace(name=m.ref.name) + for m in model_factory_map(i for items in results for i in items)] + def get_items(self, uri: str) -> Optional[List[Ref]]: + """ + Get the items in a playlist specified by ``uri``. -class TidalPlaylistsProvider(backend.PlaylistsProvider): - backend: "TidalBackend" + Returns a list of :class:`~mopidy.models.Ref` objects referring to the + playlist's items. - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._playlists_metadata = PlaylistMetadataCache() - self._playlists = PlaylistCache() - self._current_tidal_playlists = [] - self._playlists_loaded_event = Event() - - def _calculate_added_and_removed_playlist_ids( - self, - ) -> Tuple[Collection[str], Collection[str]]: - logger.info("Calculating playlist updates..") - session = self.backend.session - updated_playlists = [] - - with ThreadPoolExecutor( - 2, thread_name_prefix="mopidy-tidal-playlists-refresh-" - ) as pool: - pool_res = pool.map( - lambda func: get_items(func) - if func == session.user.favorites.playlists - else func(), - [ - session.user.favorites.playlists, - session.user.playlists, - ], - ) - - for playlists in pool_res: - updated_playlists += playlists - - self._current_tidal_playlists = updated_playlists - updated_ids = set(pl.id for pl in updated_playlists) - if not self._playlists_metadata: - return updated_ids, set() - - current_ids = set(uri.split(":")[-1] for uri in self._playlists_metadata.keys()) - added_ids = updated_ids.difference(current_ids) - removed_ids = current_ids.difference(updated_ids) - self._playlists_metadata.prune( - *[ - uri - for uri in self._playlists_metadata.keys() - if uri.split(":")[-1] in removed_ids - ] - ) + If a playlist with the given ``uri`` doesn't exist, it returns + :class:`None`. - return added_ids, removed_ids + :rtype: list of :class:`mopidy.models.Ref`, or :class:`None` - def _has_changes(self, playlist: MopidyPlaylist): - upstream_playlist = self.backend.session.playlist(playlist.uri.split(":")[-1]) - if not upstream_playlist: - return True + .. versionadded:: 1.0 + """ + logger.debug(f"TidalPlaylistsProvider.get_items({uri})") + return [ + t.ref + for t in lookup_uri(self.backend.session, uri).tracks() + ] - upstream_last_updated_at = to_timestamp( - getattr(upstream_playlist, "last_updated", None) + def create(self, name: str) -> Optional[Playlist]: + """ + Create a new empty playlist with the given name. + + Returns a new playlist with the given name and an URI, or :class:`None` + on failure. + + *MUST be implemented by subclass.* + + :param name: name of the new playlist + :type name: string + :rtype: :class:`mopidy.models.Playlist` or :class:`None` + """ + logger.debug("TidalPlaylistsProvider.create(%s)", name) + return Playlist( + uri=self.NEW_PLAYLIST_URI, + name=name, + tracks=[], + last_modified=0 ) - local_last_updated_at = to_timestamp(playlist.last_modified) - if not upstream_last_updated_at: - logger.warning( - "You are using a version of python-tidal that does not " - "support last_updated on playlist objects" - ) - return True + def delete(self, uri: str) -> bool: + """ + Delete playlist identified by the URI. - if upstream_last_updated_at > local_last_updated_at: - logger.info( - 'The playlist "%s" has been updated: refresh forced', playlist.name - ) - return True + Returns :class:`True` if deleted, :class:`False` otherwise. - return False + *MUST be implemented by subclass.* - @login_hack(list[Ref.playlist]) - def as_list(self) -> list[Ref]: - if not self._playlists_loaded_event.is_set(): - added_ids, _ = self._calculate_added_and_removed_playlist_ids() - if added_ids: - self.refresh(include_items=False) - - logger.debug("Listing TIDAL playlists..") - refs = [ - Ref.playlist(uri=pl.uri, name=pl.name) - for pl in self._playlists_metadata.values() - ] + :param uri: URI of the playlist to delete + :type uri: string + :rtype: :class:`bool` + + .. versionchanged:: 2.2 + Return type defined. + """ + logger.error("NotImplemented: TidalPlaylistsProvider.delete(%s)", uri) + return False - return sorted(refs, key=operator.attrgetter("name")) - - def _lookup_mix(self, uri): - mix_id = uri.split(":")[-1] - session = self.backend.session - return session.mix(mix_id) - - def _get_or_refresh_playlist(self, uri) -> Optional[MopidyPlaylist]: - parts = uri.split(":") - if parts[1] == "mix": - mix = self._lookup_mix(uri) - return full_models_mappers.create_mopidy_mix_playlist(mix) - - playlist = self._playlists.get(uri) - if (playlist is None) or (playlist and self._has_changes(playlist)): - self.refresh(uri, include_items=True) - return self._playlists.get(uri) - - def create(self, name): - tidal_playlist = self.backend.session.user.create_playlist(name, "") - pl = create_mopidy_playlist(tidal_playlist, []) - - self._current_tidal_playlists.append(tidal_playlist) - self.refresh(pl.uri) - return pl - - def delete(self, uri): - playlist_id = uri.split(":")[-1] - session = self.backend.session - - try: - session.request.request( - "DELETE", - "playlists/{playlist_id}".format( - playlist_id=playlist_id, - ), - ) - except HTTPError as e: - # If we got a 401, it's likely that the user is following - # this playlist but they don't have permissions for removing - # it. If that's the case, remove the playlist from the - # favourites instead of deleting it. - if e.response.status_code == 401 and uri in { - f"tidal:playlist:{pl.id}" for pl in session.user.favorites.playlists() - }: - session.user.favorites.remove_playlist(playlist_id) - else: - raise e - - self._playlists_metadata.prune(uri) - self._playlists.prune(uri) - - @login_hack - def lookup(self, uri) -> Optional[MopidyPlaylist]: - return self._get_or_refresh_playlist(uri) - - @login_hack - def refresh(self, *uris, include_items: bool = True) -> dict[str, MopidyPlaylist]: - if uris: - logger.info("Looking up playlists: %r", uris) + @cached(TTLCache(maxsize=1, ttl=3), key=lambda _, uri: hash(uri)) + def lookup(self, uri: str) -> Optional[Playlist]: + """ + Lookup playlist with given URI in both the set of playlists and in any + other playlist source. + + Returns the playlists or :class:`None` if not found. + + *MUST be implemented by subclass.* + + :param uri: playlist URI + :type uri: string + :rtype: :class:`mopidy.models.Playlist` or :class:`None` + """ + logger.debug("TidalPlaylistsProvider.lookup(%s)", uri) + injections = {p.uri: p for p in self.INJECTED_PLAYLISTS.values()} + if uri in injections: + return injections[uri] + return lookup_uri(self.backend.session, uri).full + + def refresh(self, *args, **kwargs) -> None: + """ + Refresh the playlists in :attr:`playlists`. + + *MUST be implemented by subclass.* + """ + logger.error("NotImplemented: TidalPlaylistsProvider.refresh(%s, %s)", (args, kwargs)) + + def save(self, playlist: Playlist) -> Optional[Playlist]: + """ + Save the given playlist. + + The playlist must have an ``uri`` attribute set. To create a new + playlist with an URI, use :meth:`create`. + + Returns the saved playlist or :class:`None` on failure. + + *MUST be implemented by subclass.* + + :param playlist: the playlist to save + :type playlist: :class:`mopidy.models.Playlist` + :rtype: :class:`mopidy.models.Playlist` or :class:`None` + """ + old_playlist = self.lookup.cache.get(hash(playlist.uri)) + if old_playlist: + logger.debug(f"TidalPlaylistsProvider.save: existing {playlist.uri}, {playlist.name}, {len(playlist.tracks)}") + new_tracks = set(t.uri for t in playlist.tracks) + old_tracks = set(t.uri for t in old_playlist.tracks) + self._add_tracks(new_tracks - old_tracks, playlist) + self._delete_tracks(old_tracks - new_tracks, old_playlist) + elif playlist.uri == self.NEW_PLAYLIST_URI: + logger.debug(f"TidalPlaylistsProvider.save: new {playlist.uri}, {playlist.name}, {len(playlist.tracks)}") + self._create_new_playlist(playlist) else: - logger.info("Refreshing TIDAL playlists..") - - session = self.backend.session - plists = self._current_tidal_playlists - mapped_playlists = {} - playlist_cache = self._playlists if include_items else self._playlists_metadata - - for pl in plists: - uri = "tidal:playlist:" + pl.id - # Skip or cache hit case - if (uris and uri not in uris) or pl in playlist_cache: - continue - - # Cache miss case - if include_items: - pl_tracks = self._retrieve_api_tracks(session, pl) - tracks = full_models_mappers.create_mopidy_tracks(pl_tracks) - else: - # Create as many mock tracks as the number of items in the playlist. - # Playlist metadata is concerned only with the number of tracks, not - # the actual list. - tracks = [mock_track] * pl.num_tracks - - mapped_playlists[uri] = MopidyPlaylist( - uri=uri, - name=pl.name, - tracks=tracks, - last_modified=to_timestamp(pl.last_updated), - ) - - # When we trigger a playlists_loaded event the backend may call as_list - # again. Set an event in playlist_cache_refresh_secs seconds to ensure - # that we don't perform another playlist sync. - self._playlists_loaded_event.set() - playlist_cache_refresh_secs = self.backend._config["tidal"].get( - "playlist_cache_refresh_secs" - ) + logger.error(f"NotImplemented: TidalPlaylistsProvider.save({playlist})") + return playlist - if playlist_cache_refresh_secs: - Timer( - playlist_cache_refresh_secs, - lambda: self._playlists_loaded_event.clear(), - ).start() - - # Update the right playlist cache and send the playlists_loaded event. - playlist_cache.update(mapped_playlists) - backend.BackendListener.send("playlists_loaded") - logger.info("TIDAL playlists refreshed") - - @login_hack - def get_items(self, uri) -> Optional[List[Ref]]: - playlist = self._get_or_refresh_playlist(uri) - if not playlist: - return - - return [Ref.track(uri=t.uri, name=t.name) for t in playlist.tracks] - - def _retrieve_api_tracks(self, session, playlist): - getter_args = tuple() - return get_items(playlist.tracks, *getter_args) - - def save(self, playlist): - old_playlist = self._get_or_refresh_playlist(playlist.uri) - session = self.backend.session - playlist_id = playlist.uri.split(":")[-1] - assert old_playlist, f"No such playlist: {playlist.uri}" - assert session, "No active session" - upstream_playlist = session.playlist(playlist_id) - - # Playlist rename case - if old_playlist.name != playlist.name: - upstream_playlist.edit(title=playlist.name) - - additions = [] - removals = [] - remove_offset = 0 - diff_lines = difflib.ndiff( - [t.uri for t in old_playlist.tracks], [t.uri for t in playlist.tracks] - ) + def _add_tracks(self, tracks, playlist): + logger.error(f"NotImplemented: TidalPlaylistsProvider._add_tracks({tracks}, {playlist.uri}") + + def _delete_tracks(self, tracks, playlist): + logger.error(f"NotImplemented: TidalPlaylistsProvider._delete_tracks({tracks}, {playlist.uri}") - for diff_line in diff_lines: - if diff_line.startswith("+ "): - additions.append(diff_line[2:].split(":")[-1]) - else: - if diff_line.startswith("- "): - removals.append(remove_offset) - remove_offset += 1 - - # Process removals in descending order so we don't have to recalculate - # the offsets while we remove tracks - if removals: - logger.info( - 'Removing %d tracks from the playlist "%s"', - len(removals), - playlist.name, - ) - - removals.reverse() - for idx in removals: - upstream_playlist.remove_by_index(idx) - - # tidalapi currently only supports appending tracks to the end of the - # playlist - if additions: - logger.info( - 'Adding %d tracks to the playlist "%s"', len(additions), playlist.name - ) - - upstream_playlist.add(additions) - - # remove all defunct tracks from cache - self._calculate_added_and_removed_playlist_ids() - # force update the whole playlist so all state is good - self.refresh(playlist.uri) + def _create_new_playlist(self, playlist): + logger.error(f"NotImplemented: TidalPlaylistsProvider._create_new_playlist({playlist.tracks}, {playlist.name})") diff --git a/mopidy_tidal/ref_models_mappers.py b/mopidy_tidal/ref_models_mappers.py deleted file mode 100755 index 9bd2895..0000000 --- a/mopidy_tidal/ref_models_mappers.py +++ /dev/null @@ -1,141 +0,0 @@ -from __future__ import unicode_literals - -import logging - -from mopidy.models import Ref -from tidalapi import Album, Artist, Mix, Playlist, Track - -logger = logging.getLogger(__name__) - - -def create_root(): - return [ - # Ref.directory(uri="tidal:home", name="Home"), This page takes forever to load... - Ref.directory(uri="tidal:for_you", name="For You"), - Ref.directory(uri="tidal:explore", name="Explore"), - Ref.directory(uri="tidal:genres", name="Genres"), - Ref.directory(uri="tidal:moods", name="Moods"), - Ref.directory(uri="tidal:mixes", name="Mixes"), - Ref.directory(uri="tidal:my_artists", name="My Artists"), - Ref.directory(uri="tidal:my_albums", name="My Albums"), - Ref.directory(uri="tidal:my_playlists", name="My Playlists"), - Ref.directory(uri="tidal:my_tracks", name="My Tracks"), - ] - - -def create_artists(tidal_artists): - return [create_artist(a) for a in tidal_artists] - - -def create_artist(tidal_artist): - return Ref.artist( - uri="tidal:artist:" + str(tidal_artist.id), name=tidal_artist.name - ) - - -def create_playlists(tidal_playlists): - return [create_playlist(p) for p in tidal_playlists] - - -def create_playlist(tidal_playlist): - return Ref.playlist( - uri="tidal:playlist:" + str(tidal_playlist.id), name=tidal_playlist.name - ) - - -def create_moods(tidal_moods): - return [create_mood(m) for m in tidal_moods] - - -def create_mood(tidal_mood): - mood_id = tidal_mood.api_path.split("/")[-1] - return Ref.directory(uri="tidal:mood:" + mood_id, name=tidal_mood.title) - - -def create_genres(tidal_genres): - return [create_genre(m) for m in tidal_genres] - - -def create_genre(tidal_genre): - genre_id = tidal_genre.path - return Ref.directory(uri="tidal:genre:" + genre_id, name=tidal_genre.name) - - -def create_mixed_directory(tidal_mixed): - res = [create_mixed_entry(m) for m in tidal_mixed] - # Remove None/Unsupported entries - res_filtered = [i for i in res if i is not None] - return res_filtered - - -def create_mixed_entry(tidal_mixed): - if isinstance(tidal_mixed, Mix): - return Ref.playlist( - uri="tidal:mix:" + tidal_mixed.id, - name=f"{tidal_mixed.title} ({tidal_mixed.sub_title})", - ) - elif isinstance(tidal_mixed, Album): - return Ref.album( - uri="tidal:album:" + str(tidal_mixed.id), - name=f"{tidal_mixed.name} ({tidal_mixed.artist.name})", - ) - elif isinstance(tidal_mixed, Playlist): - return Ref.playlist( - uri="tidal:playlist:" + str(tidal_mixed.id), - name=f"{tidal_mixed.name}", - ) - elif isinstance(tidal_mixed, Track): - return create_track(tidal_mixed) - elif isinstance(tidal_mixed, Artist): - return create_artist(tidal_mixed) - else: - if hasattr(tidal_mixed, "api_path"): - # Objects containing api_path are usually pages and must be processed further - return Ref.directory( - uri="tidal:page:" + tidal_mixed.api_path, name=tidal_mixed.title - ) - elif hasattr(tidal_mixed, "artifact_id"): - # Objects containing artifact_id can be viewed directly - explore_id = tidal_mixed.artifact_id - name = f"{tidal_mixed.short_header} ({tidal_mixed.short_sub_header})" - if tidal_mixed.type == "PLAYLIST": - return Ref.playlist( - uri="tidal:playlist:" + explore_id, - name=name, - ) - else: - # Unsupported type (eg. interview, exturl) - return None - else: - # Unsupported type (eg. Video) - return None - - -def create_mixes(tidal_mixes): - return [create_mix(m) for m in tidal_mixes] - - -def create_mix(tidal_mix): - return Ref.playlist( - uri="tidal:mix:" + tidal_mix.id, - name=f"{tidal_mix.title} ({tidal_mix.sub_title})", - ) - - -def create_albums(tidal_albums): - return [create_album(a) for a in tidal_albums] - - -def create_album(tidal_album): - return Ref.album(uri="tidal:album:" + str(tidal_album.id), name=tidal_album.name) - - -def create_tracks(tidal_tracks): - return [create_track(t) for t in tidal_tracks] - - -def create_track(tidal_track): - uri = "tidal:track:{0}:{1}:{2}".format( - tidal_track.artist.id, tidal_track.album.id, tidal_track.id - ) - return Ref.track(uri=uri, name=tidal_track.name) diff --git a/mopidy_tidal/search.py b/mopidy_tidal/search.py index 85798bb..c8d7b11 100755 --- a/mopidy_tidal/search.py +++ b/mopidy_tidal/search.py @@ -1,214 +1,96 @@ from __future__ import unicode_literals import logging -from collections import OrderedDict -from concurrent.futures import ThreadPoolExecutor +import operator +from collections import defaultdict from dataclasses import dataclass -from enum import IntEnum -from typing import ( - Callable, - Collection, - Iterable, - List, - Mapping, - Sequence, - Tuple, - Type, - Union, -) +from functools import partial +from typing import Type, Callable -from lru_cache import SearchCache -from tidalapi.album import Album -from tidalapi.artist import Artist -from tidalapi.media import Track +import tidalapi as tdl +from cachetools import cached, LRUCache +from cachetools.keys import hashkey -from mopidy_tidal.full_models_mappers import ( - create_mopidy_albums, - create_mopidy_artists, - create_mopidy_tracks, -) -from mopidy_tidal.utils import remove_watermark +from mopidy_tidal.models import Artist, Album, PlaylistAsAlbum, Track +from mopidy_tidal.workers import threaded, paginated logger = logging.getLogger(__name__) -class SearchField(IntEnum): - ANY = 0 - ARTIST = 1 - ALBUMARTIST = 2 - ALBUM = 3 - TITLE = 4 - - -@dataclass -class SearchFieldMeta: - field: SearchField - request_field: str - results_fields: Sequence[str] - model_classes: Collection[Type[Union[Artist, Album, Track]]] - mappers: Sequence[Callable[[Collection], Sequence]] - - -fields_meta = { - meta.field: meta - for meta in [ - SearchFieldMeta( - SearchField.ANY, - request_field="any", - results_fields=("artists", "albums", "tracks"), - model_classes=(Artist, Album, Track), - mappers=(create_mopidy_artists, create_mopidy_albums, create_mopidy_tracks), - ), - SearchFieldMeta( - SearchField.ARTIST, - request_field="artist", - results_fields=("artists",), - model_classes=(Artist,), - mappers=(create_mopidy_artists,), - ), - SearchFieldMeta( - SearchField.ALBUMARTIST, - request_field="albumartist", - results_fields=("artists",), - model_classes=(Artist,), - mappers=(create_mopidy_artists,), - ), - SearchFieldMeta( - SearchField.ALBUM, - request_field="album", - results_fields=("albums",), - model_classes=(Album,), - mappers=(create_mopidy_albums,), - ), - SearchFieldMeta( - SearchField.TITLE, - request_field="track_name", - results_fields=("tracks",), - model_classes=(Track,), - mappers=(create_mopidy_tracks,), - ), - ] +_search_fields = { + "any": (tdl.Album, tdl.Artist, tdl.Track, tdl.Playlist), + "album": (tdl.Album, ), + "artist": (tdl.Artist, ), + "albumartist": (tdl.Artist, ), + "performer": (tdl.Artist, ), + "composer": (tdl.Artist, ), + "track_name": (tdl.Track, ), } -def _get_flattened_query_and_field_meta( - query: Mapping[str, str] -) -> Tuple[str, SearchFieldMeta]: - q = " ".join( - query[field] - for field in ("any", "artist", "album", "track_name") - if query.get(field) - ) - - fields_by_request_field = { - field_meta.request_field: field_meta for field_meta in fields_meta.values() - } - - matched_field_meta = fields_by_request_field["any"] - for attr in ("track_name", "album", "artist", "albumartist"): - field_meta = fields_by_request_field.get(attr) - if field_meta and query.get(attr): - matched_field_meta = field_meta - break - - return q, matched_field_meta - - -def _get_exact_result( - query: Mapping, - results: Tuple[Iterable[Artist], Iterable[Album], Iterable[Track]], - field_meta: SearchFieldMeta, -) -> Tuple[List[Artist], List[Album], List[Track]]: - query_value = query[field_meta.request_field] - filtered_results = [], [], [] - - for i, attr in enumerate( - (SearchField.TITLE, SearchField.ALBUM, SearchField.ARTIST) - ): - if attr == field_meta.field: - item = next( - ( - res - # TODO: why not results[-i-1]? - for res in results[len(results) - i - 1] - if res.name and res.name.lower() == query_value.lower() - ), - None, - ) - - if item: - filtered_results[len(results) - i - 1].append(item) - break - - return filtered_results - - -def _expand_artist_top_tracks(artist: Artist) -> List[Track]: - return artist.get_top_tracks(limit=25) - - -def _expand_album_tracks(album: Album) -> List[Track]: - return album.tracks() - - -def _expand_results_tracks( - results: Tuple[List[Artist], List[Album], List[Track]], -) -> Tuple[List[Artist], List[Album], List[Track]]: - results_ = list(results) - artists = results_[0] - albums = results_[1] - - with ThreadPoolExecutor(4, thread_name_prefix="mopidy-tidal-search-") as pool: - pool_res = pool.map(_expand_artist_top_tracks, artists) - for tracks in pool_res: - results_[2].extend(tracks) - - pool_res = pool.map(_expand_album_tracks, albums) - for tracks in pool_res: - results_[2].extend(tracks) - - # Remove any duplicate tracks from results - tracks_by_id = OrderedDict({track.id: track for track in results_[2]}) - results_[2] = list(tracks_by_id.values()) - return tuple(results_) - - -@SearchCache -def tidal_search(session, query, exact=False): - logger.info("Searching Tidal for: %r", query) - query = query.copy() - - for field, value in query.items(): - if hasattr(value, "__iter__") and not isinstance(value, (str, bytes)): - value = value[0] - query[field] = remove_watermark(value) - - query_string, field_meta = _get_flattened_query_and_field_meta(query) - - results = [[], [], []] # artists, albums, tracks - api_results = session.search(query_string, models=field_meta.model_classes) - - for i, field_type in enumerate( - (SearchField.ARTIST, SearchField.ALBUM, SearchField.TITLE) - ): - meta = fields_meta[field_type] - results_field = meta.results_fields[0] - mapper = meta.mappers[0] +@dataclass +class ResultMeta: + from_k: str + from_type: Type + to_k: str + to_make: Callable + + +_result_map = ( + ResultMeta("artists", tdl.Artist, "artists", Artist.from_api, ), + ResultMeta("albums", tdl.Album, "albums", Album.from_api, ), + ResultMeta("tracks", tdl.Track, "tracks", Track.from_api, ), + ResultMeta("playlists", tdl.Playlist, "albums", PlaylistAsAlbum.from_api, ), +) - if not (results_field in api_results and results_field in meta.results_fields): - continue - results[i] = api_results[results_field] +def to_result(key): + return next(( + m + for m in _result_map + if operator.eq(key, m.from_k) or isinstance(key, m.from_type) + ), None) - if exact: - results = list(_get_exact_result(query, tuple(results), field_meta)) - _expand_results_tracks(results) - for i, field_type in enumerate( - (SearchField.ARTIST, SearchField.ALBUM, SearchField.TITLE) - ): - meta = fields_meta[field_type] - mapper = meta.mappers[0] - results[i] = mapper(results[i]) +@cached( + LRUCache(maxsize=128), + key=lambda *args, query, total, exact: hashkey( + hashkey(**{k: tuple(v) for k, v in query.items()}), + total, exact + ) +) +def tidal_search(session: tdl.Session, /, *, query, total, exact=False): + logger.info(f"Search query: {query!r}") + queries = { + _search_fields[k]: query.pop(k) + # this picks in order search fields + # and ignores further searches for same type + for k in reversed(_search_fields) + if k in query + } + if query: # other fields not mapped will go to playlist search + # pick first field and ignore subsequent since we can't squash keywords + queries[(tdl.Playlist, )] = next(v for v in query.values()) + + logger.info(f"Search translated query: {queries!r}") + results = defaultdict(list) + for thread in threaded(*( + partial(paginated, partial(session.search, q, models=m), total=total) + for m, q in queries.items() + )): + for page in thread: + top_hit = page.pop("top_hit", None) + if top_hit: + meta = to_result(top_hit) + if meta: + results[meta.to_k].append(meta.to_make(top_hit)) + for k, values in page.items(): + meta = to_result(k) + if meta: + results[meta.to_k].extend(meta.to_make(i) for i in page[k]) + + logger.info(f"Search results: {dict((k, len(v)) for k, v in results.items())!r}") + threaded(*(i.build for items in results.values() for i in items), max_workers=10) + logger.info(f"Search results built") + return {k: [i.full for i in v] for k, v in results.items()} - return tuple(results) diff --git a/mopidy_tidal/uri.py b/mopidy_tidal/uri.py new file mode 100644 index 0000000..b0557f5 --- /dev/null +++ b/mopidy_tidal/uri.py @@ -0,0 +1,93 @@ +from enum import unique, Enum +from typing import NamedTuple, Optional, Any + +from mopidy_tidal import Extension + + +@unique +class URIType(Enum): + TRACK = "track" + ALBUM = "album" + ARTIST = "artist" + PLAYLIST = "playlist" + MIX = "mix" + PAGE = "page" + FUTURE = "future" + DIRECTORY = "directory" + + def __str__(self): + return str(self.value) + + +class URIData(NamedTuple): + uri: str + type: Any + id: Optional[str] = None + + +class URI: + _ext = Extension.ext_name + _sep = ":" + + def __init__(self, _type, _id: str = None): + uri = self._sep.join(map(str, filter(bool, (self._ext, _type, _id)))) + self._data = URIData(uri, _type, _id) + + @classmethod + def from_string(cls, uri): + _ext, _type, *_id = uri.split(cls._sep, 2) + if _ext != URI._ext: + return None + try: + _type = URIType(_type) + except ValueError: + pass + return cls(_type, *_id) + + @property + def track(self): + if self.type == URIType.TRACK and self.id: + return self.id + raise AttributeError + + @property + def playlist(self): + if self.type == URIType.PLAYLIST and self.id: + return self.id + raise AttributeError + + @property + def mix(self): + if self.type == URIType.MIX and self.id: + return self.id + raise AttributeError + + @property + def album(self): + if self.type == URIType.ALBUM and self.id: + return self.id + raise AttributeError + + @property + def artist(self): + if self.type == URIType.ARTIST and self.id: + return self.id + raise AttributeError + + @property + def page(self): + if self.type == URIType.PAGE and self.id: + return self.id + raise AttributeError + + @property + def future(self): + if self.type == URIType.FUTURE and self.id: + return self.id + raise AttributeError + + def __getattr__(self, item): + return getattr(self._data, item) + + def __str__(self): + return self.uri diff --git a/mopidy_tidal/utils.py b/mopidy_tidal/utils.py index 146ae7f..cf3c7ef 100644 --- a/mopidy_tidal/utils.py +++ b/mopidy_tidal/utils.py @@ -1,8 +1,4 @@ -from mopidy.models import Track - watermark = " [TIDAL]" -mock_track = Track(uri="tidal:track:0:0:0", artists=[], name=None) - def apply_watermark(val): return val + watermark diff --git a/mopidy_tidal/workers.py b/mopidy_tidal/workers.py index 33d3906..d1c581f 100644 --- a/mopidy_tidal/workers.py +++ b/mopidy_tidal/workers.py @@ -1,59 +1,43 @@ -from concurrent.futures import ThreadPoolExecutor -from typing import Callable - - -def func_wrapper(args): - (f, offset, *args) = args - items = f(*args) - return list((i + offset, item) for i, item in enumerate(items)) - - -def get_items( - func: Callable, - *args, - parse: Callable = lambda _: _, - chunk_size: int = 100, - processes: int = 5, -): - """ - This function performs pagination on a function that supports - `limit`/`offset` parameters and it runs API requests in parallel to speed - things up. - """ - items = [] - offsets = [-chunk_size] - remaining = chunk_size * processes - +from concurrent.futures import ThreadPoolExecutor, as_completed +from functools import partial + +TIDAL_PAGE_SIZE = 50 # Highly recommended + + +def paginated(call, limit=TIDAL_PAGE_SIZE, total=None): + if total: + pages = (total // limit) + min(1, total % limit) + for items in sorted_threaded(*( + partial(call, limit=limit, offset=limit * idx) + for idx in range(pages) + )): + yield items + else: + idx = 0 + while True: + results = call(limit=limit, offset=limit * idx) + yield results + if len(results) < limit: + break + idx += 1 + + +def _threaded(*args, max_workers=None): + thread_count = len(args) with ThreadPoolExecutor( - processes, thread_name_prefix=f"mopidy-tidal-{func.__name__}-" - ) as pool: - while remaining == chunk_size * processes: - offsets = [offsets[-1] + chunk_size * (i + 1) for i in range(processes)] - - pool_results = pool.map( - func_wrapper, - [ - ( - func, - offset, - *args, - chunk_size, # limit - offset, # offset - ) - for offset in offsets - ], - ) - - new_items = [] - for results in pool_results: - new_items.extend(results) - - remaining = len(new_items) - items.extend(new_items) - - items = [_ for _ in items if _] - sorted_items = list( - map(lambda item: item[1], sorted(items, key=lambda item: item[0])) - ) - - return list(map(parse, sorted_items)) + max_workers=min(max_workers, thread_count) if max_workers else thread_count, + thread_name_prefix=f"mopidy-tidal-split-", + ) as executor: + futures = {executor.submit(call): call for call in args} + for future in as_completed(futures): + yield futures[future], future.result() + + +def threaded(*args, **kwargs): + for _, result in _threaded(*args, **kwargs): + yield result + + +def sorted_threaded(*args, **kwargs): + results = dict(_threaded(*args, **kwargs)) + return [results[call] for call in args] diff --git a/pyproject.toml b/pyproject.toml index 44670a4..0ea6527 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ classifiers = [ [tool.poetry.dependencies] python = "^3.9" +cachetools = "^5.3.0" Mopidy = "^3.0" tidalapi = "^0.7.3"