From 8899fee7fd26de9f2ebf0b4285767cb987f2d279 Mon Sep 17 00:00:00 2001 From: sigma67 Date: Sun, 31 Dec 2023 19:55:38 +0100 Subject: [PATCH] add mypy --- .github/workflows/lint.yml | 13 ++++- .pre-commit-config.yaml | 17 +++---- CONTRIBUTING.rst | 4 +- pdm.lock | 77 ++++++++++++++++++++++++++++- pyproject.toml | 7 +++ tests/test.py | 1 + ytmusicapi/auth/browser.py | 3 +- ytmusicapi/auth/oauth/base.py | 76 +++++++++++++++------------- ytmusicapi/auth/oauth/refreshing.py | 9 ++-- ytmusicapi/mixins/_protocol.py | 28 +++++++++++ ytmusicapi/mixins/browsing.py | 17 ++++--- ytmusicapi/mixins/explore.py | 11 +++-- ytmusicapi/mixins/library.py | 30 +++++------ ytmusicapi/mixins/playlists.py | 38 ++++++++------ ytmusicapi/mixins/search.py | 11 +++-- ytmusicapi/mixins/uploads.py | 11 +++-- ytmusicapi/mixins/watch.py | 19 ++++--- ytmusicapi/navigation.py | 4 +- ytmusicapi/parsers/i18n.py | 6 +-- ytmusicapi/parsers/playlists.py | 7 ++- ytmusicapi/parsers/watch.py | 5 +- ytmusicapi/setup.py | 18 +++---- ytmusicapi/ytmusic.py | 48 ++++++++++++------ 23 files changed, 312 insertions(+), 148 deletions(-) create mode 100644 ytmusicapi/mixins/_protocol.py diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 978318d7..e5508d13 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -1,4 +1,4 @@ -name: Ruff +name: lint on: pull_request: @@ -13,4 +13,13 @@ jobs: - uses: chartboost/ruff-action@v1 - uses: chartboost/ruff-action@v1 with: - args: format --check \ No newline at end of file + args: format --check + mypy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v3 + with: + python-version: "3.11" + - run: pip install mypy==1.8.0 + - run: mypy --install-types --non-interactive \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b7e27caf..95407a2d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,10 +1,9 @@ repos: -- repo: https://github.com/pre-commit/mirrors-yapf - rev: v0.32.0 - hooks: - - id: yapf - additional_dependencies: [toml] -- repo: https://github.com/PyCQA/flake8 - rev: 6.0.0 - hooks: - - id: flake8 +- repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.1.9 + hooks: + # Run the linter. + - id: ruff + # Run the formatter. + - id: ruff-format \ No newline at end of file diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index c212c22c..8d0c2f17 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -17,7 +17,9 @@ Before making changes to the code, install the development requirements using .. code-block:: - pip install -e .[dev] + pip install pipx + pipx install pdm pre-commit + pdm install Before committing, stage your files and run style and linter checks: diff --git a/pdm.lock b/pdm.lock index 73114f5c..0e1fff8e 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "dev"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.4.1" -content_hash = "sha256:e8454691de99746f0fe6ba42b53f33bd3f0b8928021df648441b496e8c9a2f11" +content_hash = "sha256:b26172c266ad5d5a9ad9604af3348c8f2577fd45938709849735bda1cdbd9f2d" [[package]] name = "alabaster" @@ -324,6 +324,58 @@ files = [ {file = "MarkupSafe-2.1.3.tar.gz", hash = "sha256:af598ed32d6ae86f1b747b82783958b1a4ab8f617b06fe68795c7f026abbdcad"}, ] +[[package]] +name = "mypy" +version = "1.8.0" +requires_python = ">=3.8" +summary = "Optional static typing for Python" +groups = ["dev"] +dependencies = [ + "mypy-extensions>=1.0.0", + "tomli>=1.1.0; python_version < \"3.11\"", + "typing-extensions>=4.1.0", +] +files = [ + {file = "mypy-1.8.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:485a8942f671120f76afffff70f259e1cd0f0cfe08f81c05d8816d958d4577d3"}, + {file = "mypy-1.8.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:df9824ac11deaf007443e7ed2a4a26bebff98d2bc43c6da21b2b64185da011c4"}, + {file = "mypy-1.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2afecd6354bbfb6e0160f4e4ad9ba6e4e003b767dd80d85516e71f2e955ab50d"}, + {file = "mypy-1.8.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8963b83d53ee733a6e4196954502b33567ad07dfd74851f32be18eb932fb1cb9"}, + {file = "mypy-1.8.0-cp310-cp310-win_amd64.whl", hash = "sha256:e46f44b54ebddbeedbd3d5b289a893219065ef805d95094d16a0af6630f5d410"}, + {file = "mypy-1.8.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:855fe27b80375e5c5878492f0729540db47b186509c98dae341254c8f45f42ae"}, + {file = "mypy-1.8.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:4c886c6cce2d070bd7df4ec4a05a13ee20c0aa60cb587e8d1265b6c03cf91da3"}, + {file = "mypy-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d19c413b3c07cbecf1f991e2221746b0d2a9410b59cb3f4fb9557f0365a1a817"}, + {file = "mypy-1.8.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:9261ed810972061388918c83c3f5cd46079d875026ba97380f3e3978a72f503d"}, + {file = "mypy-1.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:51720c776d148bad2372ca21ca29256ed483aa9a4cdefefcef49006dff2a6835"}, + {file = "mypy-1.8.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:52825b01f5c4c1c4eb0db253ec09c7aa17e1a7304d247c48b6f3599ef40db8bd"}, + {file = "mypy-1.8.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:f5ac9a4eeb1ec0f1ccdc6f326bcdb464de5f80eb07fb38b5ddd7b0de6bc61e55"}, + {file = "mypy-1.8.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afe3fe972c645b4632c563d3f3eff1cdca2fa058f730df2b93a35e3b0c538218"}, + {file = "mypy-1.8.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:42c6680d256ab35637ef88891c6bd02514ccb7e1122133ac96055ff458f93fc3"}, + {file = "mypy-1.8.0-cp312-cp312-win_amd64.whl", hash = "sha256:720a5ca70e136b675af3af63db533c1c8c9181314d207568bbe79051f122669e"}, + {file = "mypy-1.8.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:028cf9f2cae89e202d7b6593cd98db6759379f17a319b5faf4f9978d7084cdc6"}, + {file = "mypy-1.8.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4e6d97288757e1ddba10dd9549ac27982e3e74a49d8d0179fc14d4365c7add66"}, + {file = "mypy-1.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7f1478736fcebb90f97e40aff11a5f253af890c845ee0c850fe80aa060a267c6"}, + {file = "mypy-1.8.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:42419861b43e6962a649068a61f4a4839205a3ef525b858377a960b9e2de6e0d"}, + {file = "mypy-1.8.0-cp38-cp38-win_amd64.whl", hash = "sha256:2b5b6c721bd4aabaadead3a5e6fa85c11c6c795e0c81a7215776ef8afc66de02"}, + {file = "mypy-1.8.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:5c1538c38584029352878a0466f03a8ee7547d7bd9f641f57a0f3017a7c905b8"}, + {file = "mypy-1.8.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4ef4be7baf08a203170f29e89d79064463b7fc7a0908b9d0d5114e8009c3a259"}, + {file = "mypy-1.8.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7178def594014aa6c35a8ff411cf37d682f428b3b5617ca79029d8ae72f5402b"}, + {file = "mypy-1.8.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:ab3c84fa13c04aeeeabb2a7f67a25ef5d77ac9d6486ff33ded762ef353aa5592"}, + {file = "mypy-1.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:99b00bc72855812a60d253420d8a2eae839b0afa4938f09f4d2aa9bb4654263a"}, + {file = "mypy-1.8.0-py3-none-any.whl", hash = "sha256:538fd81bb5e430cc1381a443971c0475582ff9f434c16cd46d2c66763ce85d9d"}, + {file = "mypy-1.8.0.tar.gz", hash = "sha256:6ff8b244d7085a0b425b56d327b480c3b29cafbd2eff27316a004f9a7391ae07"}, +] + +[[package]] +name = "mypy-extensions" +version = "1.0.0" +requires_python = ">=3.5" +summary = "Type system extensions for programs checked with the mypy type checker." +groups = ["dev"] +files = [ + {file = "mypy_extensions-1.0.0-py3-none-any.whl", hash = "sha256:4392f6c0eb8a5668a69e23d168ffa70f0be9ccfd32b5cc2d26a34ae5b844552d"}, + {file = "mypy_extensions-1.0.0.tar.gz", hash = "sha256:75dbf8955dc00442a438fc4d0666508a9a97b6bd41aa2f0ffe9d2f2725af0782"}, +] + [[package]] name = "packaging" version = "23.2" @@ -536,6 +588,29 @@ files = [ {file = "sphinxcontrib_serializinghtml-1.1.5-py2.py3-none-any.whl", hash = "sha256:352a9a00ae864471d3a7ead8d7d79f5fc0b57e8b3f95e9867eb9eb28999b92fd"}, ] +[[package]] +name = "tomli" +version = "2.0.1" +requires_python = ">=3.7" +summary = "A lil' TOML parser" +groups = ["dev"] +marker = "python_version < \"3.11\"" +files = [ + {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, + {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, +] + +[[package]] +name = "typing-extensions" +version = "4.9.0" +requires_python = ">=3.8" +summary = "Backported and Experimental Type Hints for Python 3.8+" +groups = ["dev"] +files = [ + {file = "typing_extensions-4.9.0-py3-none-any.whl", hash = "sha256:af72aea155e91adfc61c3ae9e0e342dbc0cba726d6cba4b6c72c1f34e47291cd"}, + {file = "typing_extensions-4.9.0.tar.gz", hash = "sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783"}, +] + [[package]] name = "urllib3" version = "2.1.0" diff --git a/pyproject.toml b/pyproject.toml index 314a7032..5b5c5609 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,10 +44,17 @@ extend-select = [ "I", # isort ] +[tool.mypy] +files = [ + "ytmusicapi/" +] +mypy_path = "ytmusicapi" + [tool.pdm.dev-dependencies] dev = [ "coverage>=7.4.0", 'sphinx<7', 'sphinx-rtd-theme', "ruff>=0.1.9", + "mypy>=1.8.0", ] \ No newline at end of file diff --git a/tests/test.py b/tests/test.py index b223ae57..67d4fb17 100644 --- a/tests/test.py +++ b/tests/test.py @@ -327,6 +327,7 @@ def test_get_search_suggestions(self): # add search term to history first_pass = self.yt_auth.search("b") self.assertGreater(len(first_pass), 0) + time.sleep(3) # get results results = self.yt_auth.get_search_suggestions("b", detailed_runs=True) self.assertGreater(len(results), 0) diff --git a/ytmusicapi/auth/browser.py b/ytmusicapi/auth/browser.py index 34dda063..5ad13757 100644 --- a/ytmusicapi/auth/browser.py +++ b/ytmusicapi/auth/browser.py @@ -1,5 +1,6 @@ import os import platform +from typing import Optional from requests.structures import CaseInsensitiveDict @@ -13,7 +14,7 @@ def is_browser(headers: CaseInsensitiveDict) -> bool: return all(key in headers for key in browser_structure) -def setup_browser(filepath=None, headers_raw=None): +def setup_browser(filepath: Optional[str] = None, headers_raw: Optional[str] = None) -> str: contents = [] if not headers_raw: eof = "Ctrl-D" if platform.system() != "Windows" else "'Enter, Ctrl-Z, Enter'" diff --git a/ytmusicapi/auth/oauth/base.py b/ytmusicapi/auth/oauth/base.py index c92c4362..dbbe4c6d 100644 --- a/ytmusicapi/auth/oauth/base.py +++ b/ytmusicapi/auth/oauth/base.py @@ -1,6 +1,7 @@ import json import time -from typing import Dict, Optional +from abc import ABC +from typing import Mapping, Optional from requests.structures import CaseInsensitiveDict @@ -13,7 +14,7 @@ class Credentials: client_id: str client_secret: str - def get_code(self) -> Dict: + def get_code(self) -> Mapping: raise NotImplementedError() def token_from_code(self, device_code: str) -> RefreshableTokenDict: @@ -23,17 +24,17 @@ def refresh_token(self, refresh_token: str) -> BaseTokenDict: raise NotImplementedError() -class Token: +class Token(ABC): """Base class representation of the YouTubeMusicAPI OAuth token.""" - access_token: str - refresh_token: str - expires_in: int - expires_at: int - is_expiring: bool + _access_token: str + _refresh_token: str + _expires_in: int + _expires_at: int + _is_expiring: bool - scope: DefaultScope - token_type: Bearer + _scope: DefaultScope + _token_type: Bearer def __repr__(self) -> str: """Readable version.""" @@ -57,6 +58,34 @@ def as_auth(self) -> str: """Returns Authorization header ready str of token_type and access_token.""" return f"{self.token_type} {self.access_token}" + @property + def access_token(self) -> str: + return self._access_token + + @property + def refresh_token(self) -> str: + return self._refresh_token + + @property + def token_type(self) -> Bearer: + return self._token_type + + @property + def scope(self) -> DefaultScope: + return self._scope + + @property + def expires_at(self) -> int: + return self._expires_at + + @property + def expires_in(self) -> int: + return self._expires_in + + @property + def is_expiring(self) -> bool: + return self.expires_in < 60 + class OAuthToken(Token): """Wrapper for an OAuth token implementing expiration methods.""" @@ -68,7 +97,7 @@ def __init__( scope: str, token_type: str, expires_at: Optional[int] = None, - expires_in: Optional[int] = None, + expires_in: int = 0, ): """ @@ -84,10 +113,11 @@ def __init__( self._access_token = access_token self._refresh_token = refresh_token self._scope = scope + self._token_type = token_type # set/calculate token expiration using current epoch - self._expires_at: int = expires_at if expires_at else int(time.time() + expires_in) - self._token_type = token_type + self._expires_at: int = expires_at if expires_at else int(time.time()) + expires_in + self._expires_in: int = expires_in @staticmethod def is_oauth(headers: CaseInsensitiveDict) -> bool: @@ -109,26 +139,6 @@ def update(self, fresh_access: BaseTokenDict): self._access_token = fresh_access["access_token"] self._expires_at = int(time.time() + fresh_access["expires_in"]) - @property - def access_token(self) -> str: - return self._access_token - - @property - def refresh_token(self) -> str: - return self._refresh_token - - @property - def token_type(self) -> Bearer: - return self._token_type - - @property - def scope(self) -> DefaultScope: - return self._scope - - @property - def expires_at(self) -> int: - return self._expires_at - @property def expires_in(self) -> int: return int(self.expires_at - time.time()) diff --git a/ytmusicapi/auth/oauth/refreshing.py b/ytmusicapi/auth/oauth/refreshing.py index 2d48e8a5..e6243b52 100644 --- a/ytmusicapi/auth/oauth/refreshing.py +++ b/ytmusicapi/auth/oauth/refreshing.py @@ -47,6 +47,10 @@ def __init__(self, token: OAuthToken, credentials: Credentials, local_cache: Opt # values to new file location via setter self._local_cache = local_cache + @property + def token_type(self) -> Bearer: + return self.token.token_type + @property def local_cache(self) -> str | None: return self._local_cache @@ -78,11 +82,6 @@ def store_token(self, path: Optional[str] = None) -> None: with open(file_path, encoding="utf8", mode="w") as file: json.dump(self.token.as_dict(), file, indent=True) - @property - def token_type(self) -> Bearer: - # pass underlying value - return self.token.token_type - def as_dict(self) -> RefreshableTokenDict: # override base class method with call to underlying token's method return self.token.as_dict() diff --git a/ytmusicapi/mixins/_protocol.py b/ytmusicapi/mixins/_protocol.py new file mode 100644 index 00000000..41581b29 --- /dev/null +++ b/ytmusicapi/mixins/_protocol.py @@ -0,0 +1,28 @@ +"""protocol that defines the functions available to mixins""" +from typing import Dict, Optional, Protocol + +from requests import Response + +from ytmusicapi.auth.types import AuthType +from ytmusicapi.parsers.i18n import Parser + + +class MixinProtocol(Protocol): + """protocol that defines the functions available to mixins""" + + auth_type: AuthType + + parser: Parser + + headers: Dict[str, str] + + proxies: Optional[Dict[str, str]] + + def _check_auth(self) -> None: + pass + + def _send_request(self, endpoint: str, body: Dict, additionalParams: str = "") -> Dict: + pass + + def _send_get_request(self, url: str, params: Optional[Dict] = None) -> Response: + pass diff --git a/ytmusicapi/mixins/browsing.py b/ytmusicapi/mixins/browsing.py index 2c99c823..f5b6e3b5 100644 --- a/ytmusicapi/mixins/browsing.py +++ b/ytmusicapi/mixins/browsing.py @@ -1,16 +1,19 @@ -from typing import Dict, List +import re +from typing import Any, Dict, List, Optional from ytmusicapi.continuations import get_continuations from ytmusicapi.helpers import YTM_DOMAIN, sum_total_duration from ytmusicapi.parsers.albums import parse_album_header -from ytmusicapi.parsers.browsing import * +from ytmusicapi.parsers.browsing import parse_album, parse_content_list, parse_mixed_content, parse_playlist from ytmusicapi.parsers.library import parse_albums from ytmusicapi.parsers.playlists import parse_playlist_items +from ..navigation import * +from ._protocol import MixinProtocol from ._utils import get_datestamp -class BrowsingMixin: +class BrowsingMixin(MixinProtocol): def get_home(self, limit=3) -> List[Dict]: """ Get the home page. @@ -217,7 +220,7 @@ def get_artist(self, channelId: str) -> Dict: response = self._send_request(endpoint, body) results = nav(response, SINGLE_COLUMN_TAB + SECTION_LIST) - artist = {"description": None, "views": None} + artist: Dict[str, Any] = {"description": None, "views": None} header = response["header"]["musicImmersiveHeaderRenderer"] artist["name"] = nav(header, TITLE_TEXT) descriptionShelf = find_object_by_key(results, DESCRIPTION_SHELF[0], is_key=True) @@ -434,7 +437,7 @@ def get_album(self, browseId: str) -> Dict: return album - def get_song(self, videoId: str, signatureTimestamp: int = None) -> Dict: + def get_song(self, videoId: str, signatureTimestamp: Optional[int] = None) -> Dict: """ Returns metadata and streaming information about a song or video. @@ -743,7 +746,7 @@ def get_basejs_url(self): return YTM_DOMAIN + match.group(1) - def get_signatureTimestamp(self, url: str = None) -> int: + def get_signatureTimestamp(self, url: Optional[str] = None) -> int: """ Fetch the `base.js` script from YouTube Music and parse out the `signatureTimestamp` for use with :py:func:`get_song`. @@ -793,7 +796,7 @@ def get_tasteprofile(self) -> Dict: } return taste_profiles - def set_tasteprofile(self, artists: List[str], taste_profile: Dict = None) -> None: + def set_tasteprofile(self, artists: List[str], taste_profile: Optional[Dict] = None) -> None: """ Favorites artists to see more recommendations from the artist. Use :py:func:`get_tasteprofile` to see which artists are available to be recommended diff --git a/ytmusicapi/mixins/explore.py b/ytmusicapi/mixins/explore.py index c56266c2..6cfa7e6e 100644 --- a/ytmusicapi/mixins/explore.py +++ b/ytmusicapi/mixins/explore.py @@ -1,9 +1,10 @@ -from typing import Dict, List +from typing import Any, Dict, List +from ytmusicapi.mixins._protocol import MixinProtocol from ytmusicapi.parsers.explore import * -class ExploreMixin: +class ExploreMixin(MixinProtocol): def get_mood_categories(self) -> Dict: """ Fetch "Moods & Genres" categories from YouTube Music. @@ -49,7 +50,7 @@ def get_mood_categories(self) -> Dict: } """ - sections = {} + sections: Dict[str, Any] = {} response = self._send_request("browse", {"browseId": "FEmusic_moods_and_genres"}) for section in nav(response, SINGLE_COLUMN_TAB + SECTION_LIST): title = nav(section, GRID + ["header", "gridHeaderRenderer"] + TITLE_TEXT) @@ -187,13 +188,13 @@ def get_charts(self, country: str = "ZZ") -> Dict: } """ - body = {"browseId": "FEmusic_charts"} + body: Dict[str, Any] = {"browseId": "FEmusic_charts"} if country: body["formData"] = {"selectedValues": [country]} response = self._send_request("browse", body) results = nav(response, SINGLE_COLUMN_TAB + SECTION_LIST) - charts = {"countries": {}} + charts: Dict[str, Any] = {"countries": {}} menu = nav( results[0], MUSIC_SHELF diff --git a/ytmusicapi/mixins/library.py b/ytmusicapi/mixins/library.py index 3633960e..bde3b7a7 100644 --- a/ytmusicapi/mixins/library.py +++ b/ytmusicapi/mixins/library.py @@ -1,14 +1,15 @@ from random import randint -from typing import Dict, List +from typing import Dict, List, Optional from ytmusicapi.continuations import * from ytmusicapi.parsers.browsing import * from ytmusicapi.parsers.library import * +from ._protocol import MixinProtocol from ._utils import * -class LibraryMixin: +class LibraryMixin(MixinProtocol): def get_library_playlists(self, limit: int = 25) -> List[Dict]: """ Retrieves the playlists in the user's library. @@ -44,7 +45,7 @@ def get_library_playlists(self, limit: int = 25) -> List[Dict]: return playlists def get_library_songs( - self, limit: int = 25, validate_responses: bool = False, order: str = None + self, limit: int = 25, validate_responses: bool = False, order: Optional[str] = None ) -> List[Dict]: """ Gets the songs in the user's library (liked videos are not included). @@ -114,7 +115,7 @@ def get_library_songs( return songs - def get_library_albums(self, limit: int = 25, order: str = None) -> List[Dict]: + def get_library_albums(self, limit: int = 25, order: Optional[str] = None) -> List[Dict]: """ Gets the albums in the user's library. @@ -149,7 +150,7 @@ def get_library_albums(self, limit: int = 25, order: str = None) -> List[Dict]: response, lambda additionalParams: self._send_request(endpoint, body, additionalParams), limit ) - def get_library_artists(self, limit: int = 25, order: str = None) -> List[Dict]: + def get_library_artists(self, limit: int = 25, order: Optional[str] = None) -> List[Dict]: """ Gets the artists of the songs in the user's library. @@ -177,7 +178,7 @@ def get_library_artists(self, limit: int = 25, order: str = None) -> List[Dict]: response, lambda additionalParams: self._send_request(endpoint, body, additionalParams), limit ) - def get_library_subscriptions(self, limit: int = 25, order: str = None) -> List[Dict]: + def get_library_subscriptions(self, limit: int = 25, order: Optional[str] = None) -> List[Dict]: """ Gets the artists the user has subscribed to. @@ -196,15 +197,6 @@ def get_library_subscriptions(self, limit: int = 25, order: str = None) -> List[ response, lambda additionalParams: self._send_request(endpoint, body, additionalParams), limit ) - def get_liked_songs(self, limit: int = 100) -> Dict: - """ - Gets playlist items for the 'Liked Songs' playlist - - :param limit: How many items to return. Default: 100 - :return: List of playlistItem dictionaries. See :py:func:`get_playlist` - """ - return self.get_playlist("LM", limit) - def get_history(self) -> List[Dict]: """ Gets your play history in reverse chronological order @@ -260,7 +252,7 @@ def remove_history_items(self, feedbackTokens: List[str]) -> Dict: # pragma: no return response - def rate_song(self, videoId: str, rating: str = "INDIFFERENT") -> Dict: + def rate_song(self, videoId: str, rating: str = "INDIFFERENT") -> Optional[Dict]: """ Rates a song ("thumbs up"/"thumbs down" interactions on YouTube Music) @@ -275,11 +267,11 @@ def rate_song(self, videoId: str, rating: str = "INDIFFERENT") -> Dict: body = {"target": {"videoId": videoId}} endpoint = prepare_like_endpoint(rating) if endpoint is None: - return + return None return self._send_request(endpoint, body) - def edit_song_library_status(self, feedbackTokens: List[str] = None) -> Dict: + def edit_song_library_status(self, feedbackTokens: Optional[List[str]] = None) -> Dict: """ Adds or removes a song from your library depending on the token provided. @@ -290,7 +282,7 @@ def edit_song_library_status(self, feedbackTokens: List[str] = None) -> Dict: self._check_auth() body = {"feedbackTokens": feedbackTokens} endpoint = "feedback" - return endpoint if not endpoint else self._send_request(endpoint, body) + return self._send_request(endpoint, body) def rate_playlist(self, playlistId: str, rating: str = "INDIFFERENT") -> Dict: """ diff --git a/ytmusicapi/mixins/playlists.py b/ytmusicapi/mixins/playlists.py index a7bcc0c4..c0a7b596 100644 --- a/ytmusicapi/mixins/playlists.py +++ b/ytmusicapi/mixins/playlists.py @@ -1,4 +1,4 @@ -from typing import Dict, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union from ytmusicapi.continuations import * from ytmusicapi.helpers import sum_total_duration, to_int @@ -6,10 +6,11 @@ from ytmusicapi.parsers.browsing import parse_content_list, parse_playlist from ytmusicapi.parsers.playlists import * +from ._protocol import MixinProtocol from ._utils import * -class PlaylistsMixin: +class PlaylistsMixin(MixinProtocol): def get_playlist( self, playlistId: str, limit: int = 100, related: bool = False, suggestions_limit: int = 0 ) -> Dict: @@ -196,13 +197,22 @@ def get_playlist( playlist["duration_seconds"] = sum_total_duration(playlist) return playlist + def get_liked_songs(self, limit: int = 100) -> Dict: + """ + Gets playlist items for the 'Liked Songs' playlist + + :param limit: How many items to return. Default: 100 + :return: List of playlistItem dictionaries. See :py:func:`get_playlist` + """ + return self.get_playlist("LM", limit) + def create_playlist( self, title: str, description: str, privacy_status: str = "PRIVATE", - video_ids: List = None, - source_playlist: str = None, + video_ids: Optional[List] = None, + source_playlist: Optional[str] = None, ) -> Union[str, Dict]: """ Creates a new empty playlist and returns its id. @@ -233,11 +243,11 @@ def create_playlist( def edit_playlist( self, playlistId: str, - title: str = None, - description: str = None, - privacyStatus: str = None, - moveItem: Tuple[str, str] = None, - addPlaylistId: str = None, + title: Optional[str] = None, + description: Optional[str] = None, + privacyStatus: Optional[str] = None, + moveItem: Optional[Tuple[str, str]] = None, + addPlaylistId: Optional[str] = None, addToTop: Optional[bool] = None, ) -> Union[str, Dict]: """ @@ -255,7 +265,7 @@ def edit_playlist( :return: Status String or full response """ self._check_auth() - body = {"playlistId": validate_playlist_id(playlistId)} + body: Dict[str, Any] = {"playlistId": validate_playlist_id(playlistId)} actions = [] if title: actions.append({"action": "ACTION_SET_PLAYLIST_NAME", "playlistName": title}) @@ -305,8 +315,8 @@ def delete_playlist(self, playlistId: str) -> Union[str, Dict]: def add_playlist_items( self, playlistId: str, - videoIds: List[str] = None, - source_playlist: str = None, + videoIds: Optional[List[str]] = None, + source_playlist: Optional[str] = None, duplicates: bool = False, ) -> Union[str, Dict]: """ @@ -319,7 +329,7 @@ def add_playlist_items( :return: Status String and a dict containing the new setVideoId for each videoId or full response """ self._check_auth() - body = {"playlistId": validate_playlist_id(playlistId), "actions": []} + body: Dict[str, Any] = {"playlistId": validate_playlist_id(playlistId), "actions": []} if not videoIds and not source_playlist: raise Exception("You must provide either videoIds or a source_playlist to add to the playlist") @@ -363,7 +373,7 @@ def remove_playlist_items(self, playlistId: str, videos: List[Dict]) -> Union[st if len(videos) == 0: raise Exception("Cannot remove songs, because setVideoId is missing. Do you own this playlist?") - body = {"playlistId": validate_playlist_id(playlistId), "actions": []} + body: Dict[str, Any] = {"playlistId": validate_playlist_id(playlistId), "actions": []} for video in videos: body["actions"].append( { diff --git a/ytmusicapi/mixins/search.py b/ytmusicapi/mixins/search.py index 783a48b3..ac9699c6 100644 --- a/ytmusicapi/mixins/search.py +++ b/ytmusicapi/mixins/search.py @@ -1,15 +1,16 @@ -from typing import Dict, List, Union +from typing import Any, Dict, List, Optional, Union from ytmusicapi.continuations import get_continuations +from ytmusicapi.mixins._protocol import MixinProtocol from ytmusicapi.parsers.search import * -class SearchMixin: +class SearchMixin(MixinProtocol): def search( self, query: str, - filter: str = None, - scope: str = None, + filter: Optional[str] = None, + scope: Optional[str] = None, limit: int = 20, ignore_spelling: bool = False, ) -> List[Dict]: @@ -134,7 +135,7 @@ def search( """ body = {"query": query} endpoint = "search" - search_results = [] + search_results: List[Dict[str, Any]] = [] filters = [ "albums", "artists", diff --git a/ytmusicapi/mixins/uploads.py b/ytmusicapi/mixins/uploads.py index 1f87842d..98955a28 100644 --- a/ytmusicapi/mixins/uploads.py +++ b/ytmusicapi/mixins/uploads.py @@ -1,6 +1,6 @@ import ntpath import os -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union import requests @@ -17,11 +17,12 @@ from ytmusicapi.parsers.uploads import parse_uploaded_items from ..auth.types import AuthType +from ._protocol import MixinProtocol from ._utils import prepare_order_params, validate_order_parameter -class UploadsMixin: - def get_library_upload_songs(self, limit: int = 25, order: str = None) -> List[Dict]: +class UploadsMixin(MixinProtocol): + def get_library_upload_songs(self, limit: int = 25, order: Optional[str] = None) -> List[Dict]: """ Returns a list of uploaded songs @@ -68,7 +69,7 @@ def get_library_upload_songs(self, limit: int = 25, order: str = None) -> List[D return songs - def get_library_upload_albums(self, limit: int = 25, order: str = None) -> List[Dict]: + def get_library_upload_albums(self, limit: int = 25, order: Optional[str] = None) -> List[Dict]: """ Gets the albums of uploaded songs in the user's library. @@ -87,7 +88,7 @@ def get_library_upload_albums(self, limit: int = 25, order: str = None) -> List[ response, lambda additionalParams: self._send_request(endpoint, body, additionalParams), limit ) - def get_library_upload_artists(self, limit: int = 25, order: str = None) -> List[Dict]: + def get_library_upload_artists(self, limit: int = 25, order: Optional[str] = None) -> List[Dict]: """ Gets the artists of uploaded songs in the user's library. diff --git a/ytmusicapi/mixins/watch.py b/ytmusicapi/mixins/watch.py index 4e5fc37a..f5839eba 100644 --- a/ytmusicapi/mixins/watch.py +++ b/ytmusicapi/mixins/watch.py @@ -1,19 +1,20 @@ -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from ytmusicapi.continuations import get_continuations +from ytmusicapi.mixins._protocol import MixinProtocol from ytmusicapi.parsers.playlists import validate_playlist_id from ytmusicapi.parsers.watch import * -class WatchMixin: +class WatchMixin(MixinProtocol): def get_watch_playlist( self, - videoId: str = None, - playlistId: str = None, + videoId: Optional[str] = None, + playlistId: Optional[str] = None, limit=25, radio: bool = False, shuffle: bool = False, - ) -> Dict[str, Union[List[Dict]]]: + ) -> Dict[str, Union[List[Dict], str, None]]: """ Get a watch list of tracks. This watch playlist appears when you press play on a track in YouTube Music. @@ -120,8 +121,12 @@ def get_watch_playlist( "musicVideoType": "MUSIC_VIDEO_TYPE_ATV", } } - body["playlistId"] = validate_playlist_id(playlistId) - is_playlist = body["playlistId"].startswith("PL") or body["playlistId"].startswith("OLA") + is_playlist = False + if playlistId: + playlist_id = validate_playlist_id(playlistId) + is_playlist = playlist_id.startswith("PL") or playlist_id.startswith("OLA") + body["playlistId"] = playlist_id + if shuffle and playlistId is not None: body["params"] = "wAEB8gECKAE%3D" if radio: diff --git a/ytmusicapi/navigation.py b/ytmusicapi/navigation.py index 6b2bfd8c..4a0411ec 100644 --- a/ytmusicapi/navigation.py +++ b/ytmusicapi/navigation.py @@ -1,4 +1,6 @@ # commonly used navigation paths +from typing import Any, Dict, List + CONTENT = ["contents", 0] RUN_TEXT = ["runs", 0, "text"] TAB_CONTENT = ["tabs", 0, "tabRenderer", "content"] @@ -67,7 +69,7 @@ FRAMEWORK_MUTATIONS = ["frameworkUpdates", "entityBatchUpdate", "mutations"] -def nav(root, items, none_if_absent=False): +def nav(root: Dict[str, Any], items: List[Any], none_if_absent: bool = False) -> Any: """Access a nested object in root by item sequence.""" try: for k in items: diff --git a/ytmusicapi/parsers/i18n.py b/ytmusicapi/parsers/i18n.py index 289a4571..f8083044 100644 --- a/ytmusicapi/parsers/i18n.py +++ b/ytmusicapi/parsers/i18n.py @@ -1,4 +1,4 @@ -from typing import Dict, List +from typing import Any, Dict, List from ytmusicapi.navigation import CAROUSEL, CAROUSEL_TITLE, NAVIGATION_BROWSE_ID, nav from ytmusicapi.parsers._utils import i18n @@ -32,9 +32,9 @@ def get_search_result_types(self): @i18n def parse_artist_contents(self, results: List) -> Dict: categories = ["albums", "singles", "videos", "playlists", "related"] - categories_local = [_("albums"), _("singles"), _("videos"), _("playlists"), _("related")] + categories_local = [_("albums"), _("singles"), _("videos"), _("playlists"), _("related")] # type: ignore[name-defined] categories_parser = [parse_album, parse_single, parse_video, parse_playlist, parse_related_artist] - artist = {} + artist: Dict[str, Any] = {} for i, category in enumerate(categories): data = [ r["musicCarouselShelfRenderer"] diff --git a/ytmusicapi/parsers/playlists.py b/ytmusicapi/parsers/playlists.py index 59c5ee2d..75add749 100644 --- a/ytmusicapi/parsers/playlists.py +++ b/ytmusicapi/parsers/playlists.py @@ -1,10 +1,9 @@ -from typing import List +from typing import List, Optional -from ._utils import * from .songs import * -def parse_playlist_items(results, menu_entries: List[List] = None): +def parse_playlist_items(results, menu_entries: Optional[List[List]] = None): songs = [] for result in results: if MRLIR not in result: @@ -110,5 +109,5 @@ def parse_playlist_items(results, menu_entries: List[List] = None): return songs -def validate_playlist_id(playlistId): +def validate_playlist_id(playlistId: str) -> str: return playlistId if not playlistId.startswith("VL") else playlistId[2:] diff --git a/ytmusicapi/parsers/watch.py b/ytmusicapi/parsers/watch.py index 8f28dd6f..eac07779 100644 --- a/ytmusicapi/parsers/watch.py +++ b/ytmusicapi/parsers/watch.py @@ -1,8 +1,9 @@ -from ._utils import * +from typing import Any, Dict, List + from .songs import * -def parse_watch_playlist(results): +def parse_watch_playlist(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: tracks = [] PPVWR = "playlistPanelVideoWrapperRenderer" PPVR = "playlistPanelVideoRenderer" diff --git a/ytmusicapi/setup.py b/ytmusicapi/setup.py index 67da6de9..47d68bc5 100644 --- a/ytmusicapi/setup.py +++ b/ytmusicapi/setup.py @@ -1,15 +1,15 @@ import argparse import sys from pathlib import Path -from typing import Dict +from typing import Optional import requests from ytmusicapi.auth.browser import setup_browser -from ytmusicapi.auth.oauth import OAuthCredentials +from ytmusicapi.auth.oauth import OAuthCredentials, RefreshingToken -def setup(filepath: str = None, headers_raw: str = None) -> Dict: +def setup(filepath: Optional[str] = None, headers_raw: Optional[str] = None) -> str: """ Requests browser headers from the user via command line and returns a string that can be passed to YTMusic() @@ -23,13 +23,13 @@ def setup(filepath: str = None, headers_raw: str = None) -> Dict: def setup_oauth( - filepath: str = None, - session: requests.Session = None, - proxies: dict = None, + filepath: Optional[str] = None, + session: Optional[requests.Session] = None, + proxies: Optional[dict] = None, open_browser: bool = False, - client_id: str = None, - client_secret: str = None, -) -> Dict: + client_id: Optional[str] = None, + client_secret: Optional[str] = None, +) -> RefreshingToken: """ Starts oauth flow from the terminal and returns a string that can be passed to YTMusic() diff --git a/ytmusicapi/ytmusic.py b/ytmusicapi/ytmusic.py index 870f4a24..81892718 100644 --- a/ytmusicapi/ytmusic.py +++ b/ytmusicapi/ytmusic.py @@ -1,4 +1,6 @@ import gettext +import json +import locale import os import time from contextlib import suppress @@ -6,9 +8,22 @@ from typing import Dict, Optional import requests +from requests import Response from requests.structures import CaseInsensitiveDict -from ytmusicapi.helpers import * +from ytmusicapi.helpers import ( + SUPPORTED_LANGUAGES, + SUPPORTED_LOCATIONS, + USER_AGENT, + YTM_BASE_API, + YTM_DOMAIN, + YTM_PARAMS, + YTM_PARAMS_KEY, + get_authorization, + get_visitor_id, + initialize_context, + sapisid_from_cookie, +) from ytmusicapi.mixins.browsing import BrowsingMixin from ytmusicapi.mixins.explore import ExploreMixin from ytmusicapi.mixins.library import LibraryMixin @@ -35,9 +50,9 @@ class YTMusic( def __init__( self, auth: Optional[str | Dict] = None, - user: str = None, + user: Optional[str] = None, requests_session=True, - proxies: Dict = None, + proxies: Optional[Dict[str, str]] = None, language: str = "en", location: str = "", oauth_credentials: Optional[OAuthCredentials] = None, @@ -84,7 +99,9 @@ def __init__( self._headers = None #: cache formed headers including auth self.auth = auth #: raw auth - self._input_dict = {} #: parsed auth arg value in dictionary format + self._input_dict: CaseInsensitiveDict = ( + CaseInsensitiveDict() + ) #: parsed auth arg value in dictionary format self.auth_type: AuthType = AuthType.UNAUTHORIZED @@ -92,16 +109,16 @@ def __init__( self.oauth_credentials: OAuthCredentials #: Client used for OAuth refreshing self._session: requests.Session #: request session for connection pooling - self.proxies: Dict = proxies #: params for session modification + self.proxies: Optional[Dict[str, str]] = proxies #: params for session modification if isinstance(requests_session, requests.Session): self._session = requests_session else: if requests_session: # Build a new session. self._session = requests.Session() - self._session.request = partial(self._session.request, timeout=30) + self._session.request = partial(self._session.request, timeout=30) # type: ignore[method-assign] else: # Use the Requests API module as a "session". - self._session = requests.api + self._session = requests.api # type: ignore[assignment] # see google cookie docs: https://policies.google.com/technologies/cookies # value from https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/extractor/youtube.py#L502 @@ -110,18 +127,19 @@ def __init__( self.oauth_credentials = ( oauth_credentials if oauth_credentials is not None else OAuthCredentials() ) - auth_filepath = None + auth_filepath: Optional[str] = None if isinstance(self.auth, str): - if os.path.isfile(auth): - with open(auth) as json_file: - auth_filepath = auth + auth_str: str = self.auth + if os.path.isfile(auth_str): + with open(auth_str) as json_file: + auth_filepath = auth_str input_json = json.load(json_file) else: - input_json = json.loads(auth) + input_json = json.loads(auth_str) self._input_dict = CaseInsensitiveDict(input_json) else: - self._input_dict = self.auth + self._input_dict = CaseInsensitiveDict(self.auth) if OAuthToken.is_oauth(self._input_dict): base_token = OAuthToken(**self._input_dict) @@ -210,7 +228,7 @@ def _send_request(self, endpoint: str, body: Dict, additionalParams: str = "") - body.update(self.context) # only required for post requests (?) - if "X-Goog-Visitor-Id" not in self.headers: + if self._headers and "X-Goog-Visitor-Id" not in self._headers: self._headers.update(get_visitor_id(self._send_get_request)) response = self._session.post( @@ -227,7 +245,7 @@ def _send_request(self, endpoint: str, body: Dict, additionalParams: str = "") - raise Exception(message + error) return response_text - def _send_get_request(self, url: str, params: Dict = None): + def _send_get_request(self, url: str, params: Optional[Dict] = None) -> Response: response = self._session.get( url, params=params,