From 29039b5228e61a4305b47fe769604b884d50d28f Mon Sep 17 00:00:00 2001 From: Andrey Nikiforov Date: Thu, 23 May 2024 10:18:55 -0700 Subject: [PATCH] fix missing exception #836 (#839) * bump mypy 1.7.0->1.10.0 * fix missing exception #836 * add hidden imports https://github.com/pyinstaller/pyinstaller/issues/8554 * bump pyinstaller 5.13.2 -> 6.7.0 --- .github/workflows/quality-checks.yml | 2 +- CHANGELOG.md | 2 + pyproject.toml | 5 +- scripts/build_bin_linux | 6 +- scripts/build_bin_macos | 6 +- scripts/build_bin_windows | 6 +- scripts/type_check | 4 +- src/icloudpd/authentication.py | 17 ++- src/icloudpd/autodelete.py | 7 +- src/icloudpd/base.py | 52 ++++--- src/icloudpd/counter.py | 8 +- src/icloudpd/download.py | 5 +- src/icloudpd/email_notifications.py | 2 +- src/icloudpd/exif_datetime.py | 9 +- src/icloudpd/logger.py | 11 +- src/icloudpd/paths.py | 10 +- src/icloudpd/string_helpers.py | 2 +- src/pyicloud_ipd/base.py | 176 +++++++++++---------- src/pyicloud_ipd/cmdline.py | 4 +- src/pyicloud_ipd/exceptions.py | 11 +- src/pyicloud_ipd/services/account.py | 3 +- src/pyicloud_ipd/services/calendar.py | 1 + src/pyicloud_ipd/services/contacts.py | 1 + src/pyicloud_ipd/services/findmyiphone.py | 1 + src/pyicloud_ipd/services/photos.py | 43 +++--- src/pyicloud_ipd/services/reminders.py | 1 + src/pyicloud_ipd/services/ubiquity.py | 1 + src/pyicloud_ipd/utils.py | 12 +- src/starters/icloudpd_ex.py | 18 +-- tests/helpers/__init__.py | 2 +- tests/test_authentication.py | 29 ++-- tests/test_autodelete_photos.py | 38 ++--- tests/test_cli.py | 19 +-- tests/test_download_live_photos.py | 14 +- tests/test_download_photos.py | 177 +++++++++++----------- tests/test_email_notifications.py | 10 +- tests/test_folder_structure.py | 11 +- tests/test_listing_albums.py | 4 +- tests/test_listing_libraries.py | 6 +- tests/test_listing_recent_photos.py | 22 +-- tests/test_logger.py | 30 ++-- tests/test_string_helpers.py | 2 +- tests/test_two_step_auth.py | 12 +- 43 files changed, 427 insertions(+), 375 deletions(-) diff --git a/.github/workflows/quality-checks.yml b/.github/workflows/quality-checks.yml index 1076a343c..2c8348803 100644 --- a/.github/workflows/quality-checks.yml +++ b/.github/workflows/quality-checks.yml @@ -116,7 +116,7 @@ jobs: icloudpd_changelog: ${{steps.get_version.outputs.icloudpd_changelog}} build_package: - needs: [get_version, skip_check] + needs: [get_version, skip_check, type_check, lint, test] if: needs.skip_check.outputs.should_skip != 'true' uses: ./.github/workflows/build-package.yml with: diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c6d9c044..d008d407f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- fix: missing exception [#836](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/836) + ## 1.17.5 (2024-04-27) - experimental: fix errors in npm packages diff --git a/pyproject.toml b/pyproject.toml index 352121d21..ab1316057 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "tqdm==4.66.0", "piexif==1.1.3", "urllib3==1.26.16", + "typing_extensions==4.11.0", # from pyicloud_ipd "six==1.16.0", "tzlocal==4.3.1", @@ -42,7 +43,7 @@ dependencies = [ [project.optional-dependencies] dev = [ "twine==4.0.2", - "pyinstaller==5.13.2", + "pyinstaller==6.7.0", "wheel==0.42.0", ] devlinux = [ @@ -61,7 +62,7 @@ test = [ "autopep8==2.0.2", "pytest-timeout==2.1.0", "pytest-xdist==3.3.1", - "mypy==1.7.0", + "mypy==1.10.0", "types-pytz==2022.7.1.2", "types-tzlocal==4.3.0.0", "types-requests==2.31.0.2", diff --git a/scripts/build_bin_linux b/scripts/build_bin_linux index 5a405ecab..559c5771e 100755 --- a/scripts/build_bin_linux +++ b/scripts/build_bin_linux @@ -5,6 +5,6 @@ set -euo pipefail # expects python with installed dependencies # required params: version plat -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd.py src/starters/icloud.py --name icloudpd-$1-linux-$2 -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloud.py --name icloud-$1-linux-$2 -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd_ex.py --name icloudpd-ex-$1-linux-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd.py src/starters/icloud.py --name icloudpd-$1-linux-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloud.py --name icloud-$1-linux-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd_ex.py --name icloudpd-ex-$1-linux-$2 diff --git a/scripts/build_bin_macos b/scripts/build_bin_macos index 424042ca3..bf0b328ca 100755 --- a/scripts/build_bin_macos +++ b/scripts/build_bin_macos @@ -5,6 +5,6 @@ set -euo pipefail # expects python with installed dependencies # required params: version arch -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd.py src/starters/icloud.py --name icloudpd-$1-macos-$2 -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloud.py --name icloud-$1-macos-$2 -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd_ex.py --name icloudpd-ex-$1-macos-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd.py src/starters/icloud.py --name icloudpd-$1-macos-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloud.py --name icloud-$1-macos-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd_ex.py --name icloudpd-ex-$1-macos-$2 diff --git a/scripts/build_bin_windows b/scripts/build_bin_windows index b0b26fd4d..61cea6b15 100755 --- a/scripts/build_bin_windows +++ b/scripts/build_bin_windows @@ -5,6 +5,6 @@ set -euo pipefail # expects python with installed dependencies # required param: version arch -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd.py src/starters/icloud.py --name icloudpd-$1-windows-$2 -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloud.py --name icloud-$1-windows-$2 -pyinstaller --collect-all keyrings.alt --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd_ex.py --name icloudpd-ex-$1-windows-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd.py src/starters/icloud.py --name icloudpd-$1-windows-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloud.py --name icloud-$1-windows-$2 +pyinstaller --collect-all keyrings.alt --hidden-import pkg_resources.extern --hidden-import pkgutil --collect-all tzdata --onefile src/starters/icloudpd_ex.py --name icloudpd-ex-$1-windows-$2 diff --git a/scripts/type_check b/scripts/type_check index dfb1208ff..f57a6538e 100755 --- a/scripts/type_check +++ b/scripts/type_check @@ -1,5 +1,5 @@ #!/bin/bash set -euo pipefail echo "Running mypy..." -python3 -m mypy src tests -# too strict now: --disallow-any-generics --disallow-untyped-defs --strict-equality --disallow-untyped-calls --warn-return-any \ No newline at end of file +python3 -m mypy src tests --strict-equality --warn-return-any --disallow-any-generics +# too strict now: --disallow-untyped-defs --disallow-untyped-calls --check-untyped-defs diff --git a/src/icloudpd/authentication.py b/src/icloudpd/authentication.py index 5554cc539..2cb0fac47 100644 --- a/src/icloudpd/authentication.py +++ b/src/icloudpd/authentication.py @@ -2,6 +2,7 @@ import logging import sys +from typing import Callable, Optional import click import pyicloud_ipd @@ -13,14 +14,14 @@ class TwoStepAuthRequiredError(Exception): """ -def authenticator(logger: logging.Logger, domain: str): +def authenticator(logger: logging.Logger, domain: str) -> Callable[[str, Optional[str], Optional[str], bool, Optional[str]], pyicloud_ipd.PyiCloudService]: """Wraping authentication with domain context""" def authenticate_( - username, - password, - cookie_directory=None, - raise_error_on_2sa=False, - client_id=None, + username:str, + password:Optional[str], + cookie_directory:Optional[str]=None, + raise_error_on_2sa:bool=False, + client_id:Optional[str]=None, ) -> pyicloud_ipd.PyiCloudService: """Authenticate with iCloud username and password""" logger.debug("Authenticating...") @@ -59,7 +60,7 @@ def authenticate_( return authenticate_ -def request_2sa(icloud: pyicloud_ipd.PyiCloudService, logger: logging.Logger): +def request_2sa(icloud: pyicloud_ipd.PyiCloudService, logger: logging.Logger) -> None: """Request two-step authentication. Prompts for SMS or device""" devices = icloud.trusted_devices devices_count = len(devices) @@ -99,7 +100,7 @@ def request_2sa(icloud: pyicloud_ipd.PyiCloudService, logger: logging.Logger): ) -def request_2fa(icloud: pyicloud_ipd.PyiCloudService, logger: logging.Logger): +def request_2fa(icloud: pyicloud_ipd.PyiCloudService, logger: logging.Logger) -> None: """Request two-factor authentication.""" try: devices = icloud.trusted_devices diff --git a/src/icloudpd/autodelete.py b/src/icloudpd/autodelete.py index 7303e4a6c..df73366b8 100644 --- a/src/icloudpd/autodelete.py +++ b/src/icloudpd/autodelete.py @@ -6,6 +6,7 @@ from tzlocal import get_localzone from icloudpd.paths import local_download_path import pyicloud_ipd +from pyicloud_ipd.services.photos import PhotoLibrary, PhotosService def delete_file(logger: logging.Logger, path: str) -> bool: @@ -24,9 +25,9 @@ def delete_file_dry_run(logger: logging.Logger, path: str) -> bool: def autodelete_photos( logger: logging.Logger, dry_run: bool, - library_object, + library_object: PhotoLibrary, folder_structure: str, - directory: str): + directory: str) -> None: """ Scans the "Recently Deleted" folder and deletes any matching files from the download directory. @@ -51,7 +52,7 @@ def autodelete_photos( for size in ["small", "original", "medium", "thumb"]: path = os.path.normpath( local_download_path( - media, size, download_dir)) + media.filename, size, download_dir)) if os.path.exists(path): logger.debug("Deleting %s...", path) delete_local = delete_file_dry_run if dry_run else delete_file diff --git a/src/icloudpd/base.py b/src/icloudpd/base.py index a09a8e5be..4cb77d461 100644 --- a/src/icloudpd/base.py +++ b/src/icloudpd/base.py @@ -10,7 +10,7 @@ from icloudpd.email_notifications import send_2sa_notification from icloudpd import download from icloudpd.authentication import authenticator, TwoStepAuthRequiredError -from pyicloud_ipd.services.photos import PhotoAsset +from pyicloud_ipd.services.photos import PhotoAsset, PhotoLibrary from pyicloud_ipd.exceptions import PyiCloudAPIResponseException from pyicloud_ipd import PyiCloudService from tzlocal import get_localzone @@ -18,7 +18,7 @@ from tqdm import tqdm import click import urllib -from typing import Callable, Optional, TypeVar, cast +from typing import Callable, Iterable, NoReturn, Optional, Sequence, TypeVar, cast import json import subprocess import itertools @@ -247,7 +247,7 @@ # pylint: disable-msg=too-many-branches,too-many-locals def main( directory: Optional[str], - username: Optional[str], + username: str, password: Optional[str], auth_only: bool, cookie_directory: str, @@ -257,8 +257,8 @@ def main( until_found: Optional[int], album: str, list_albums: bool, - library, - list_libraries, + library: str, + list_libraries: bool, skip_videos: bool, skip_live_photos: bool, force_size: bool, @@ -281,7 +281,7 @@ def main( domain: str, watch_with_interval: Optional[int], dry_run: bool -): +) -> NoReturn: """Download all iCloud photos to a local directory""" logging.basicConfig( @@ -470,7 +470,7 @@ def download_photo_(counter: Counter, photo: PhotoAsset) -> bool: download_size = "original" download_path = local_download_path( - photo, download_size, download_dir) + photo.filename, download_size, download_dir) original_download_path = None file_exists = os.path.isfile(download_path) @@ -604,7 +604,7 @@ def download_photo_(counter: Counter, photo: PhotoAsset) -> bool: def delete_photo( logger: logging.Logger, icloud: PyiCloudService, - photo: PhotoAsset): + photo: PhotoAsset) -> None: """Delete a photo from the iCloud account.""" clean_filename_local = clean_filename(photo.filename) logger.debug( @@ -638,7 +638,7 @@ def delete_photo( def delete_photo_dry_run( logger: logging.Logger, _icloud: PyiCloudService, - photo: PhotoAsset): + photo: PhotoAsset) -> None: """Dry run for deleting a photo from the iCloud""" logger.info( "[DRY RUN] Would delete %s in iCloud", @@ -665,9 +665,9 @@ def retrier( raise -def session_error_handle_builder(logger: Logger, icloud: PyiCloudService): +def session_error_handle_builder(logger: Logger, icloud: PyiCloudService) -> Callable[[Exception, int], None]: """Build handler for session error""" - def session_error_handler(ex, attempt): + def session_error_handler(ex: Exception, attempt: int) -> None: """Handles session errors in the PhotoAlbum photos iterator""" if "Invalid global session" in str(ex): if attempt > constants.MAX_RETRIES: @@ -685,7 +685,7 @@ def session_error_handler(ex, attempt): return session_error_handler -def internal_error_handle_builder(logger: logging.Logger): +def internal_error_handle_builder(logger: logging.Logger) -> Callable[[Exception, int], None]: """Build handler for internal error""" def internal_error_handler(ex: Exception, attempt: int) -> None: """Handles session errors in the PhotoAlbum photos iterator""" @@ -702,9 +702,9 @@ def internal_error_handler(ex: Exception, attempt: int) -> None: return internal_error_handler -def compose_handlers(handlers): +def compose_handlers(handlers: Sequence[Callable[[Exception, int], None]]) -> Callable[[Exception, int], None]: """Compose multiple error handlers""" - def composed(ex, retries): + def composed(ex: Exception, retries: int) -> None: for handler in handlers: handler(ex, retries) return composed @@ -716,7 +716,7 @@ def composed(ex, retries): def core( downloader: Callable[[PyiCloudService], Callable[[Counter, PhotoAsset], bool]], directory: Optional[str], - username: Optional[str], + username: str, password: Optional[str], auth_only: bool, cookie_directory: str, @@ -725,8 +725,8 @@ def core( until_found: Optional[int], album: str, list_albums: bool, - library, - list_libraries, + library: str, + list_libraries: bool, skip_videos: bool, auto_delete: bool, only_print_filenames: bool, @@ -745,7 +745,7 @@ def core( logger: logging.Logger, watch_interval: Optional[int], dry_run: bool -): +) -> int: """Download all iCloud photos to a local directory""" raise_error_on_2sa = ( @@ -759,7 +759,7 @@ def core( password, cookie_directory, raise_error_on_2sa, - client_id=os.environ.get("CLIENT_ID"), + os.environ.get("CLIENT_ID"), ) except TwoStepAuthRequiredError: if notification_script is not None: @@ -784,7 +784,7 @@ def core( download_photo = downloader(icloud) # Access to the selected library. Defaults to the primary photos object. - library_object = icloud.photos + library_object: PhotoLibrary = icloud.photos if list_libraries: libraries_dict = icloud.photos.libraries @@ -840,15 +840,17 @@ def core( photos_count: Optional[int] = len(photos) + photos_enumerator: Iterable[PhotoAsset] = photos + # Optional: Only download the x most recent photos. if recent is not None: photos_count = recent - photos = itertools.islice(photos, recent) + photos_enumerator = itertools.islice(photos_enumerator, recent) if until_found is not None: photos_count = None # ensure photos iterator doesn't have a known length - photos = (p for p in photos) + photos_enumerator = (p for p in photos_enumerator) # Skip the one-line progress bar if we're only printing the filenames, # or if the progress bar is explicitly disabled, @@ -856,11 +858,11 @@ def core( skip_bar = not os.environ.get("FORCE_TQDM") and ( only_print_filenames or no_progress_bar or not sys.stdout.isatty()) if skip_bar: - photos_enumerator = photos + photos_enumerator = photos_enumerator # logger.set_tqdm(None) else: photos_enumerator = tqdm( - iterable=photos, + iterable=photos_enumerator, total=photos_count, leave=False, dynamic_ncols=True, @@ -907,7 +909,7 @@ def should_break(counter: Counter) -> bool: consecutive_files_found, item) and delete_after_download: - def delete_cmd(): + def delete_cmd() -> None: delete_local = delete_photo_dry_run if dry_run else delete_photo delete_local(logger, icloud, item) diff --git a/src/icloudpd/counter.py b/src/icloudpd/counter.py index 0ea84b57b..393400fd3 100644 --- a/src/icloudpd/counter.py +++ b/src/icloudpd/counter.py @@ -4,19 +4,19 @@ class Counter(object): - def __init__(self, value=0): + def __init__(self, value:int=0): self.initial_value = value self.val = RawValue('i', value) self.lock = Lock() - def increment(self): + def increment(self) -> None: with self.lock: self.val.value += 1 - def reset(self): + def reset(self) -> None: with self.lock: self.val = RawValue('i', self.initial_value) def value(self) -> int: with self.lock: - return self.val.value + return int(self.val.value) diff --git a/src/icloudpd/download.py b/src/icloudpd/download.py index bfdd53d3f..17b3739f7 100644 --- a/src/icloudpd/download.py +++ b/src/icloudpd/download.py @@ -5,6 +5,7 @@ import socket import time import datetime +from requests import Response from tzlocal import get_localzone from requests.exceptions import ConnectionError # pylint: disable=redefined-builtin import pyicloud_ipd # pylint: disable=redefined-builtin @@ -67,7 +68,7 @@ def mkdirs_for_path_dry_run( def download_response_to_path( _logger: logging.Logger, - response, + response: Response, download_path: str, created_date: datetime.datetime) -> bool: """ Saves response content into file with desired created date """ @@ -83,7 +84,7 @@ def download_response_to_path( def download_response_to_path_dry_run( logger: logging.Logger, - _response, + _response: Response, download_path: str, _created_date: datetime.datetime) -> bool: """ Pretends to save response content into a file with desired created date """ diff --git a/src/icloudpd/email_notifications.py b/src/icloudpd/email_notifications.py index 170c96607..bbac665ca 100644 --- a/src/icloudpd/email_notifications.py +++ b/src/icloudpd/email_notifications.py @@ -16,7 +16,7 @@ def send_2sa_notification( smtp_port: int, smtp_no_tls: bool, to_addr: Optional[str], - from_addr: Optional[str] = None): + from_addr: Optional[str] = None) -> None: """Send an email notification when 2SA is expired""" to_addr = cast(str, to_addr if to_addr is not None else smtp_email) from_addr = from_addr if from_addr is not None else ( diff --git a/src/icloudpd/exif_datetime.py b/src/icloudpd/exif_datetime.py index 74040c287..fa92a3be3 100644 --- a/src/icloudpd/exif_datetime.py +++ b/src/icloudpd/exif_datetime.py @@ -2,21 +2,22 @@ import datetime import logging +import typing import piexif from piexif._exceptions import InvalidImageDataError -def get_photo_exif(logger: logging.Logger, path: str): +def get_photo_exif(logger: logging.Logger, path: str) -> (typing.Optional[datetime.datetime]): """Get EXIF date for a photo, return nothing if there is an error""" try: - exif_dict = piexif.load(path) - return exif_dict.get("Exif").get(36867) + exif_dict: piexif.ExifIFD = piexif.load(path) + return typing.cast(typing.Optional[datetime.datetime], exif_dict.get("Exif").get(36867)) except (ValueError, InvalidImageDataError): logger.debug("Error fetching EXIF data for %s", path) return None -def set_photo_exif(logger: logging.Logger, path: str, date: datetime.datetime): +def set_photo_exif(logger: logging.Logger, path: str, date: datetime.datetime) -> None: """Set EXIF date on a photo, do nothing if there is an error""" try: exif_dict = piexif.load(path) diff --git a/src/icloudpd/logger.py b/src/icloudpd/logger.py index 927a0cbae..c514ae564 100644 --- a/src/icloudpd/logger.py +++ b/src/icloudpd/logger.py @@ -3,28 +3,29 @@ import sys import logging from logging import INFO +from typing import Any class IPDLogger(logging.Logger): """Custom logger class with support for tqdm progress bar""" - def __init__(self, name, level=INFO): + def __init__(self, name:str, level:int=INFO): logging.Logger.__init__(self, name, level) self.tqdm = None # If tdqm progress bar is not set, we just write regular log messages - def set_tqdm(self, tdqm): + def set_tqdm(self, tdqm: Any) -> None: """Sets the tqdm progress bar""" self.tqdm = tdqm - def set_tqdm_description(self, desc, loglevel=INFO): + def set_tqdm_description(self, desc: str, loglevel:int=INFO) -> None: """Set tqdm progress bar description, fallback to logging""" if self.tqdm is None: self.log(loglevel, desc) else: self.tqdm.set_description(desc) - def tqdm_write(self, message, loglevel=INFO): + def tqdm_write(self, message:str, loglevel:int=INFO) -> None: """Write to tqdm progress bar, fallback to logging""" if self.tqdm is None: self.log(loglevel, message) @@ -32,7 +33,7 @@ def tqdm_write(self, message, loglevel=INFO): self.tqdm.write(message) -def setup_logger(): +def setup_logger() -> logging.Logger: """Set up logger and add stdout handler""" logging.setLoggerClass(IPDLogger) logger = logging.getLogger("icloudpd") diff --git a/src/icloudpd/paths.py b/src/icloudpd/paths.py index ad043699f..6d726f415 100644 --- a/src/icloudpd/paths.py +++ b/src/icloudpd/paths.py @@ -2,7 +2,7 @@ import os -def clean_filename(filename): +def clean_filename(filename: str) -> str: """Replaces invalid chars in filenames with '_'""" result = filename.encode( "utf-8").decode("ascii", "ignore") @@ -14,17 +14,17 @@ def clean_filename(filename): return result -def local_download_path(media, size, download_dir): +def local_download_path(filename: str, size: str, download_dir: str) -> str: """Returns the full download path, including size""" - filename = filename_with_size(media, size) + filename = filename_with_size(filename, size) download_path = os.path.join(download_dir, filename) return download_path -def filename_with_size(media, size): +def filename_with_size(filename: str, size: str) -> str: """Returns the filename with size, e.g. IMG1234.jpg, IMG1234-small.jpg""" # Strip any non-ascii characters. - filename = clean_filename(media.filename) + filename = clean_filename(filename) if size == 'original': return filename return (f"-{size}.").join(filename.rsplit(".", 1)) diff --git a/src/icloudpd/string_helpers.py b/src/icloudpd/string_helpers.py index b7be02192..7a98f2c0d 100644 --- a/src/icloudpd/string_helpers.py +++ b/src/icloudpd/string_helpers.py @@ -1,7 +1,7 @@ """String helper functions""" -def truncate_middle(string: str, length: int): +def truncate_middle(string: str, length: int) -> str: """Truncates a string to a maximum length, inserting "..." in the middle""" if len(string) <= length: return string diff --git a/src/pyicloud_ipd/base.py b/src/pyicloud_ipd/base.py index 736a4796d..5d15c84da 100644 --- a/src/pyicloud_ipd/base.py +++ b/src/pyicloud_ipd/base.py @@ -1,3 +1,7 @@ +import sys +from typing import Any, Dict, NoReturn, Optional, Sequence +from typing_extensions import override +import typing from uuid import uuid1 import inspect import json @@ -10,6 +14,7 @@ import getpass from pyicloud_ipd.exceptions import ( + PyiCloudConnectionException, PyiCloudFailedLoginException, PyiCloudAPIResponseException, PyiCloud2SARequiredException, @@ -42,14 +47,15 @@ class PyiCloudPasswordFilter(logging.Filter): - def __init__(self, password): + def __init__(self, password: str): super().__init__(password) - def filter(self, record): + @override + def filter(self, record: logging.LogRecord) -> bool: message = record.getMessage() if self.name in message: - record.msg = message.replace(self.name, "*" * 8) - record.args = [] + record.msg = message.replace(self.name, "********") + record.args = [] # type: ignore[assignment] return True @@ -57,10 +63,11 @@ def filter(self, record): class PyiCloudSession(Session): """iCloud session.""" - def __init__(self, service): + def __init__(self, service: Any): self.service = service super().__init__() + @override def request(self, method, url, **kwargs): # pylint: disable=arguments-differ # Charge logging to the right service endpoint @@ -125,18 +132,18 @@ def request(self, method, url, **kwargs): # pylint: disable=arguments-differ if has_retried is None and response.status_code in [421, 450, 500]: api_error = PyiCloudAPIResponseException( - response.reason, response.status_code, retry=True + response.reason, str(response.status_code), True ) request_logger.debug(api_error) kwargs["retried"] = True return self.request(method, url, **kwargs) - self._raise_error(response.status_code, response.reason) + self._raise_error(str(response.status_code), response.reason) if content_type not in json_mimetypes: if self.service.session_data.get("apple_rscd") == "401": - code = "401" - reason = "Invalid username/password combination." + code: Optional[str] = "401" + reason: Optional[str] = "Invalid username/password combination." self._raise_error(code, reason) return response @@ -179,7 +186,7 @@ def request(self, method, url, **kwargs): # pylint: disable=arguments-differ return response - def _raise_error(self, code, reason): + def _raise_error(self, code: str, reason: str) -> NoReturn: if ( self.service.requires_2sa and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie" @@ -190,7 +197,7 @@ def _raise_error(self, code, reason): "Please log into https://icloud.com/ to manually " "finish setting up your iCloud service" ) - api_error = PyiCloudServiceNotActivatedException(reason, code) + api_error: Exception = PyiCloudServiceNotActivatedException(reason, code) LOGGER.error(api_error) raise (api_error) @@ -199,7 +206,7 @@ def _raise_error(self, code, reason): reason + ". Please wait a few minutes then try again." "The remote servers might be trying to throttle requests." ) - if code in [421, 450, 500]: + if code in ["421", "450", "500"]: reason = "Authentication required for Account." api_error = PyiCloudAPIResponseException(reason, code) @@ -219,16 +226,16 @@ class PyiCloudService: """ def __init__( - self, domain, apple_id, password=None, cookie_directory=None, verify=True, - client_id=None, with_family=True, + self, domain:str, apple_id: str, password:Optional[str]=None, cookie_directory:Optional[str]=None, verify:bool=True, + client_id:Optional[str]=None, with_family:bool=True, ): if password is None: password = get_password_from_keyring(apple_id) - self.user = {"accountName": apple_id, "password": password} - self.data = {} - self.params = {} - self.client_id = client_id or ("auth-%s" % str(uuid1()).lower()) + self.user: Dict[str, Any] = {"accountName": apple_id, "password": password} + self.data: Dict[str, Any] = {} + self.params: Dict[str, Any] = {} + self.client_id: str = client_id or ("auth-%s" % str(uuid1()).lower()) self.with_family = with_family self.password_filter = PyiCloudPasswordFilter(password) @@ -268,8 +275,9 @@ def __init__( self.session_data = json.load(session_f) except: # pylint: disable=bare-except LOGGER.info("Session file does not exist") - if self.session_data.get("client_id"): - self.client_id = self.session_data.get("client_id") + session_client_id: Optional[str] = self.session_data.get("client_id") + if session_client_id: + self.client_id = session_client_id else: self.session_data.update({"client_id": self.client_id}) @@ -282,10 +290,10 @@ def __init__( }) cookiejar_path = self.cookiejar_path - self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) + self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) # type: ignore[assignment] if path.exists(cookiejar_path): try: - self.session.cookies.load(ignore_discard=True, ignore_expires=True) + self.session.cookies.load(ignore_discard=True, ignore_expires=True) # type: ignore[attr-defined] LOGGER.debug("Read cookies from %s", cookiejar_path) except: # Most likely a pickled cookiejar from earlier versions. @@ -304,9 +312,9 @@ def __init__( self.authenticate() - self._photos = None + self._photos: Optional[PhotosService] = None - def authenticate(self, force_refresh=False, service=None): + def authenticate(self, force_refresh:bool=False, service:Optional[Any]=None) -> None: """ Handles authentication, and persists cookies so that subsequent logins will not cause additional e-mails from Apple. @@ -347,11 +355,13 @@ def authenticate(self, force_refresh=False, service=None): headers = self._get_auth_headers() - if self.session_data.get("scnt"): - headers["scnt"] = self.session_data.get("scnt") - - if self.session_data.get("session_id"): - headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + scnt = self.session_data.get("scnt") + if scnt: + headers["scnt"] = scnt + + session_id = self.session_data.get("session_id") + if session_id: + headers["X-Apple-ID-Session-Id"] = session_id try: self.session.post( @@ -374,7 +384,7 @@ def authenticate(self, force_refresh=False, service=None): LOGGER.info("Authentication completed successfully") LOGGER.debug(self.params) - def _authenticate_with_token(self): + def _authenticate_with_token(self) -> None: """Authenticate using session token.""" data = { "accountCountryCode": self.session_data.get("account_country"), @@ -398,7 +408,7 @@ def _authenticate_with_token(self): msg = f'Apple insists on using {domain_to_use} for your request. Please use --domain parameter' raise PyiCloudConnectionException(msg) - def _authenticate_with_credentials_service(self, service): + def _authenticate_with_credentials_service(self, service: str) -> None: """Authenticate to a specific service using credentials.""" data = { "appName": service, @@ -416,18 +426,19 @@ def _authenticate_with_credentials_service(self, service): msg = "Invalid email/password combination." raise PyiCloudFailedLoginException(msg, error) from error - def _validate_token(self): + def _validate_token(self) -> Dict[str, Any]: """Checks if the current access token is still valid.""" LOGGER.debug("Checking session token validity") try: req = self.session.post("%s/validate" % self.SETUP_ENDPOINT, data="null") LOGGER.debug("Session token is still valid") - return req.json() + result: Dict[str, Any] = req.json() + return result except PyiCloudAPIResponseException as err: LOGGER.debug("Invalid authentication token") raise err - def _get_auth_headers(self, overrides=None): + def _get_auth_headers(self, overrides: Optional[Dict[str, str]]=None) -> Dict[str, str]: headers = { "Accept": "*/*", "Content-Type": "application/json", @@ -445,51 +456,54 @@ def _get_auth_headers(self, overrides=None): return headers @property - def cookiejar_path(self): + def cookiejar_path(self) -> str: """Get path for cookiejar file.""" return path.join( self._cookie_directory, - "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), + "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), # type: ignore[union-attr] ) @property - def session_path(self): + def session_path(self) -> str: """Get path for session data file.""" return path.join( self._cookie_directory, - "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) + "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) # type: ignore[union-attr] + ".session", ) @property - def requires_2sa(self): + def requires_2sa(self) -> bool: """Returns True if two-step authentication is required.""" return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and ( self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session ) @property - def requires_2fa(self): + def requires_2fa(self) -> bool: """Returns True if two-factor authentication is required.""" return self.data["dsInfo"].get("hsaVersion", 0) == 2 and ( self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session ) and self.data["dsInfo"].get("hasICloudQualifyingDevice", False) @property - def is_trusted_session(self): + def is_trusted_session(self) -> bool: """Returns True if the session is trusted.""" - return self.data.get("hsaTrustedBrowser", False) + return typing.cast(bool, self.data.get("hsaTrustedBrowser", False)) @property - def trusted_devices(self): + def trusted_devices(self) -> Sequence[Dict[str, Any]]: """ Returns devices trusted for two-step authentication.""" request = self.session.get( '%s/listDevices' % self.SETUP_ENDPOINT, params=self.params ) - return request.json().get('devices') + devices: Optional[Sequence[Dict[str, Any]]] = request.json().get('devices') + if devices: + return devices + return [] - def send_verification_code(self, device): + def send_verification_code(self, device: Dict[str, Any]) -> bool: """ Requests that a verification code is sent to the given device""" data = json.dumps(device) request = self.session.post( @@ -497,9 +511,9 @@ def send_verification_code(self, device): params=self.params, data=data ) - return request.json().get('success', False) + return typing.cast(bool, request.json().get('success', False)) - def validate_verification_code(self, device, code): + def validate_verification_code(self, device: Dict[str, Any], code: str) -> bool: """Verifies a verification code received on a trusted device.""" device.update({"verificationCode": code, "trustBrowser": True}) data = json.dumps(device) @@ -523,17 +537,19 @@ def validate_verification_code(self, device, code): return not self.requires_2sa - def validate_2fa_code(self, code): + def validate_2fa_code(self, code:str) -> bool: """Verifies a verification code received via Apple's 2FA system (HSA2).""" data = {"securityCode": {"code": code}} headers = self._get_auth_headers({"Accept": "application/json"}) - if self.session_data.get("scnt"): - headers["scnt"] = self.session_data.get("scnt") - - if self.session_data.get("session_id"): - headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + scnt = self.session_data.get("scnt") + if scnt: + headers["scnt"] = scnt + + session_id = self.session_data.get("session_id") + if session_id: + headers["X-Apple-ID-Session-Id"] = session_id try: self.session.post( @@ -553,15 +569,17 @@ def validate_2fa_code(self, code): self.trust_session() return not self.requires_2sa - def trust_session(self): + def trust_session(self) -> bool: """Request session trust to avoid user log in going forward.""" headers = self._get_auth_headers() - if self.session_data.get("scnt"): - headers["scnt"] = self.session_data.get("scnt") - - if self.session_data.get("session_id"): - headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") + scnt = self.session_data.get("scnt") + if scnt: + headers["scnt"] = scnt + + session_id = self.session_data.get("session_id") + if session_id: + headers["X-Apple-ID-Session-Id"] = session_id try: self.session.get( @@ -574,42 +592,42 @@ def trust_session(self): LOGGER.error("Session trust failed.") return False - def _get_webservice_url(self, ws_key): + def _get_webservice_url(self, ws_key: str) -> str: """Get webservice URL, raise an exception if not exists.""" if self._webservices.get(ws_key) is None: raise PyiCloudServiceNotActivatedException( "Webservice not available", ws_key ) - return self._webservices[ws_key]["url"] + return typing.cast(str, self._webservices[ws_key]["url"]) @property - def devices(self): + def devices(self): # type: ignore """ Return all devices.""" service_root = self._get_webservice_url("findme") - return FindMyiPhoneServiceManager( + return FindMyiPhoneServiceManager( # type: ignore service_root, self.session, self.params ) @property - def account(self): - service_root = self._gget_webservice_url("account") - return AccountService( + def account(self): # type: ignore + service_root = self._gget_webservice_url("account") # type: ignore + return AccountService( # type: ignore service_root, self.session, self.params ) @property - def iphone(self): + def iphone(self): # type: ignore return self.devices[0] @property - def files(self): + def files(self): # type: ignore if not hasattr(self, '_files'): service_root = self._get_webservice_url("ubiquity") - self._files = UbiquityService( + self._files = UbiquityService( # type: ignore service_root, self.session, self.params @@ -617,7 +635,7 @@ def files(self): return self._files @property - def photos(self): + def photos(self) -> PhotosService: """Gets the 'Photo' service.""" if not self._photos: service_root = self._get_webservice_url("ckdatabasews") @@ -625,29 +643,29 @@ def photos(self): return self._photos @property - def calendar(self): + def calendar(self): # type: ignore service_root = self._get_webservice_url("calendar") - return CalendarService(service_root, self.session, self.params) + return CalendarService(service_root, self.session, self.params)# type: ignore @property - def contacts(self): + def contacts(self): # type: ignore service_root = self._get_webservice_url("contacts") - return ContactsService(service_root, self.session, self.params) + return ContactsService(service_root, self.session, self.params)# type: ignore @property - def reminders(self): + def reminders(self): # type: ignore service_root = self._get_webservice_url("reminders") - return RemindersService(service_root, self.session, self.params) + return RemindersService(service_root, self.session, self.params)# type: ignore - def __unicode__(self): + def __unicode__(self) -> str: return 'iCloud API: %s' % self.user.get('accountName') - def __str__(self): + def __str__(self) -> str: as_unicode = self.__unicode__() if sys.version_info[0] >= 3: return as_unicode else: return as_unicode.encode('ascii', 'ignore') - def __repr__(self): + def __repr__(self) -> str: return '<%s>' % str(self) diff --git a/src/pyicloud_ipd/cmdline.py b/src/pyicloud_ipd/cmdline.py index 83a39df47..fa939d179 100644 --- a/src/pyicloud_ipd/cmdline.py +++ b/src/pyicloud_ipd/cmdline.py @@ -1,4 +1,5 @@ #! /usr/bin/env python +# mypy: ignore-errors """ A Command Line Wrapper to allow easy use of pyicloud for command line scripts, and related. @@ -6,6 +7,7 @@ import argparse import pickle import sys +from typing import NoReturn from click import confirm @@ -28,7 +30,7 @@ def create_pickled_data(idevice, filename): pickle.dump(idevice.content, pickle_file, protocol=pickle.HIGHEST_PROTOCOL) -def main(args=None): +def main(args=None) -> NoReturn: """Main commandline entrypoint.""" if args is None: args = sys.argv[1:] diff --git a/src/pyicloud_ipd/exceptions.py b/src/pyicloud_ipd/exceptions.py index c14be60a0..7aa02c350 100644 --- a/src/pyicloud_ipd/exceptions.py +++ b/src/pyicloud_ipd/exceptions.py @@ -1,4 +1,7 @@ +from typing import Optional + + class PyiCloudException(Exception): """Generic iCloud exception.""" pass @@ -7,7 +10,7 @@ class PyiCloudException(Exception): #API class PyiCloudAPIResponseException(PyiCloudException): """iCloud response exception.""" - def __init__(self, reason, code=None, retry=False): + def __init__(self, reason:str, code:Optional[str]=None, retry:bool=False): self.reason = reason self.code = code message = reason or "" @@ -32,7 +35,7 @@ class PyiCloudFailedLoginException(PyiCloudException): class PyiCloud2SARequiredException(PyiCloudException): """iCloud 2SA required exception.""" - def __init__(self, apple_id): + def __init__(self, apple_id: str): message = "Two-step authentication required for account: %s" % apple_id super().__init__(message) @@ -53,7 +56,7 @@ class PyiCloudConnectionException(PyiCloudException): class PyiCloudAPIResponseError(PyiCloudException): - def __init__(self, reason, code): + def __init__(self, reason:str, code:(Optional[int])): self.reason = reason self.code = code message = reason @@ -64,7 +67,7 @@ def __init__(self, reason, code): class PyiCloud2SARequiredError(PyiCloudException): - def __init__(self, url): + def __init__(self, url: str): message = "Two-step authentication required for %s" % url super(PyiCloud2SARequiredError, self).__init__(message) diff --git a/src/pyicloud_ipd/services/account.py b/src/pyicloud_ipd/services/account.py index f172bd71e..3e424daad 100644 --- a/src/pyicloud_ipd/services/account.py +++ b/src/pyicloud_ipd/services/account.py @@ -1,3 +1,4 @@ +# mypy: ignore-errors import sys import six @@ -29,7 +30,7 @@ def devices(self): @six.python_2_unicode_compatible -class AccountDevice(dict): +class AccountDevice(dict): # type: ignore[type-arg] def __init__(self, device_info): super(AccountDevice, self).__init__(device_info) diff --git a/src/pyicloud_ipd/services/calendar.py b/src/pyicloud_ipd/services/calendar.py index 49cf11889..a849f8b54 100644 --- a/src/pyicloud_ipd/services/calendar.py +++ b/src/pyicloud_ipd/services/calendar.py @@ -1,3 +1,4 @@ +# mypy: ignore-errors from __future__ import absolute_import from datetime import datetime, timedelta from calendar import monthrange diff --git a/src/pyicloud_ipd/services/contacts.py b/src/pyicloud_ipd/services/contacts.py index a5a79e08a..7ad1c02d0 100644 --- a/src/pyicloud_ipd/services/contacts.py +++ b/src/pyicloud_ipd/services/contacts.py @@ -1,3 +1,4 @@ +# mypy: ignore-errors from __future__ import absolute_import import os import uuid diff --git a/src/pyicloud_ipd/services/findmyiphone.py b/src/pyicloud_ipd/services/findmyiphone.py index a2a468f9d..e2cd17fdb 100644 --- a/src/pyicloud_ipd/services/findmyiphone.py +++ b/src/pyicloud_ipd/services/findmyiphone.py @@ -1,3 +1,4 @@ +# mypy: ignore-errors import json import sys diff --git a/src/pyicloud_ipd/services/photos.py b/src/pyicloud_ipd/services/photos.py index 1227aca42..afd352287 100644 --- a/src/pyicloud_ipd/services/photos.py +++ b/src/pyicloud_ipd/services/photos.py @@ -5,6 +5,10 @@ import re from datetime import datetime +from typing import Any, Callable, Dict, Optional, Sequence +import typing + +from requests import Response from pyicloud_ipd.exceptions import PyiCloudServiceNotActivatedException from pyicloud_ipd.exceptions import PyiCloudAPIResponseException @@ -142,7 +146,7 @@ def __init__(self, service, zone_id): self.service = service self.zone_id = zone_id - self._albums = None + self._albums: Optional[Dict[str, PhotoAlbum]] = None url = ('%s/records/query?%s' % (self.service._service_endpoint, urlencode(self.service.params))) @@ -164,10 +168,10 @@ def __init__(self, service, zone_id): 'again in a few minutes'), None) @property - def albums(self): + def albums(self) -> Dict[str, "PhotoAlbum"]: if not self._albums: self._albums = { - name: PhotoAlbum(self.service, name, zone_id=self.zone_id, **props) + name: PhotoAlbum(self.service, name, zone_id=self.zone_id, **props) # type: ignore[arg-type] for (name, props) in self.SMART_FOLDERS.items() } @@ -201,7 +205,7 @@ def albums(self): return self._albums - def _fetch_folders(self): + def _fetch_folders(self) -> Sequence[Dict[str, Any]]: url = ('%s/records/query?%s' % (self.service._service_endpoint, urlencode(self.service.params))) json_data = json.dumps({ @@ -216,10 +220,10 @@ def _fetch_folders(self): ) response = request.json() - return response['records'] + return typing.cast(Sequence[Dict[str, Any]], response['records']) @property - def all(self): + def all(self) -> "PhotoAlbum": return self.albums['All Photos'] @@ -228,7 +232,7 @@ class PhotosService(PhotoLibrary): This also acts as a way to access the user's primary library. """ - def __init__(self, service_root, session, params): + def __init__(self, service_root: str, session: Any, params: Dict[str, Any]): self.session = session self.params = dict(params) self._service_root = service_root @@ -236,7 +240,7 @@ def __init__(self, service_root, session, params): ('%s/database/1/com.apple.photos.cloud/production/private' % self._service_root) - self._libraries = None + self._libraries: Optional[Dict[str, PhotoLibrary]] = None self.params.update({ 'remapEnums': True, @@ -249,13 +253,13 @@ def __init__(self, service_root, session, params): # 'clientInstanceId': self.params.pop('clientId') # }) - self._photo_assets = {} + # self._photo_assets = {} super(PhotosService, self).__init__( service=self, zone_id={u'zoneName': u'PrimarySync'}) @property - def libraries(self): + def libraries(self) -> Dict[str, PhotoLibrary]: if not self._libraries: try: url = ('%s/zones/list' % @@ -297,6 +301,7 @@ def __init__(self, service, name, list_type, obj_type, direction, self.direction = direction self.query_filter = query_filter self.page_size = page_size + self.exception_handler: Optional[Callable[[Exception, int], None]] = None self._len = None @@ -331,16 +336,16 @@ def __len__(self): # Perform the request in a separate method so that we # can mock it to test session errors. - def photos_request(self, offset): + def photos_request(self, offset: int) -> Response: url = ('%s/records/query?' % self.service._service_endpoint) + \ urlencode(self.service.params) - return self.service.session.post( + return typing.cast(Response, self.service.session.post( url, data=json.dumps(self._list_query_gen( offset, self.list_type, self.direction, self.query_filter)), headers={'Content-type': 'text/plain'} - ) + )) @property @@ -427,7 +432,7 @@ def _count_query_gen(self, obj_type): return query - def _list_query_gen(self, offset, list_type, direction, query_filter=None): + def _list_query_gen(self, offset: int, list_type: str, direction: str, query_filter=None): query = { u'query': { u'filterBy': [ @@ -513,7 +518,7 @@ def __init__(self, service, master_record, asset_record): self._master_record = master_record self._asset_record = asset_record - self._versions = None + self._versions: Optional[Dict[str, Dict[str, Any]]] = None ITEM_TYPES = { u"public.heic": u"image", @@ -615,7 +620,7 @@ def item_type_extension(self): return 'unknown' @property - def versions(self): + def versions(self) -> Dict[str, Dict[str, Any]]: if not self._versions: self._versions = {} if self.item_type == "movie": @@ -669,15 +674,15 @@ def versions(self): return self._versions - def download(self, version='original', **kwargs): + def download(self, version='original', **kwargs) -> Optional[Response]: if version not in self.versions: return None - return self._service.session.get( + return typing.cast(Response, self._service.session.get( self.versions[version]['url'], stream=True, **kwargs - ) + )) def __repr__(self): return "<%s: id=%s>" % ( diff --git a/src/pyicloud_ipd/services/reminders.py b/src/pyicloud_ipd/services/reminders.py index 12e8ba23e..3b620e619 100644 --- a/src/pyicloud_ipd/services/reminders.py +++ b/src/pyicloud_ipd/services/reminders.py @@ -1,3 +1,4 @@ +# mypy: ignore-errors from __future__ import absolute_import from datetime import datetime, timedelta import time diff --git a/src/pyicloud_ipd/services/ubiquity.py b/src/pyicloud_ipd/services/ubiquity.py index 30c7ec598..09fd0550f 100644 --- a/src/pyicloud_ipd/services/ubiquity.py +++ b/src/pyicloud_ipd/services/ubiquity.py @@ -1,3 +1,4 @@ +# mypy: ignore-errors from datetime import datetime import sys diff --git a/src/pyicloud_ipd/utils.py b/src/pyicloud_ipd/utils.py index f7fc53133..3da72c98b 100644 --- a/src/pyicloud_ipd/utils.py +++ b/src/pyicloud_ipd/utils.py @@ -7,7 +7,7 @@ KEYRING_SYSTEM = 'pyicloud://icloud-password' -def get_password(username, interactive=sys.stdout.isatty()): +def get_password(username:str, interactive:bool=sys.stdout.isatty()) -> str: try: return get_password_from_keyring(username) except PyiCloudNoStoredPasswordAvailableException: @@ -21,7 +21,7 @@ def get_password(username, interactive=sys.stdout.isatty()): ) -def password_exists_in_keyring(username): +def password_exists_in_keyring(username:str) -> bool: try: get_password_from_keyring(username) except PyiCloudNoStoredPasswordAvailableException: @@ -30,7 +30,7 @@ def password_exists_in_keyring(username): return True -def get_password_from_keyring(username): +def get_password_from_keyring(username:str) -> str: result = keyring.get_password( KEYRING_SYSTEM, username @@ -48,7 +48,7 @@ def get_password_from_keyring(username): return result -def store_password_in_keyring(username, password): +def store_password_in_keyring(username: str, password:str) -> None: return keyring.set_password( KEYRING_SYSTEM, username, @@ -56,14 +56,14 @@ def store_password_in_keyring(username, password): ) -def delete_password_in_keyring(username): +def delete_password_in_keyring(username:str) -> None: return keyring.delete_password( KEYRING_SYSTEM, username, ) -def underscore_to_camelcase(word, initial_capital=False): +def underscore_to_camelcase(word:str , initial_capital: bool=False) -> str: words = [x.capitalize() or '_' for x in word.split('_')] if not initial_capital: words[0] = words[0].lower() diff --git a/src/starters/icloudpd_ex.py b/src/starters/icloudpd_ex.py index e85b7a33b..7e08a0ed5 100644 --- a/src/starters/icloudpd_ex.py +++ b/src/starters/icloudpd_ex.py @@ -26,7 +26,7 @@ @click.group() -def commands(): +def commands() -> None: pass @@ -59,7 +59,7 @@ def commands(): default="com", help="Root Domain for requests to iCloud. com or cn", ) -def icloud(username, password, non_interactive, delete_from_keyring, domain): +def icloud(_username:str, _password:str, _non_interactive:bool, _delete_from_keyring:bool, _domain:str) -> None: """Legacy iCloud utils (keyring)""" # raise Exception("blah") icloud_main(sys.argv[2:]) @@ -68,38 +68,38 @@ def icloud(username, password, non_interactive, delete_from_keyring, domain): @commands.command() @click.argument('appleid') # , help="AppleID of the account to use") @click.argument('target') # , help="Target path template") -def copy(appleid, target): +def copy(_appleid:str, _target:str) -> None: """Copy assets from iCloud to local storage""" @commands.command() -def move(): +def move() -> None: """Move assets from iCloud to local storage""" @commands.group() -def auth(): +def auth() -> None: """Manages persistant credentials""" @auth.command() @click.argument('appleid') # , help="AppleID of the account to use") -def add(appleid): +def add(_appleid:str) -> None: """Add credentials to keyring""" @auth.command() @click.argument('appleid') # , help="AppleID of the account to use") -def delete(appleid): +def delete(_appleid:str) -> None: """Delete credentials from keyring""" @commands.group() -def watch(): +def watch() -> None: """Watch for iCloud changes""" -def main(): +def main() -> None: commands.add_command(icloudpd_main, name="icloudpd") watch.add_command(copy) watch.add_command(move) diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py index 510951b9b..94269f182 100644 --- a/tests/helpers/__init__.py +++ b/tests/helpers/__init__.py @@ -4,7 +4,7 @@ from click.testing import Result -def print_result_exception(result: Result): +def print_result_exception(result: Result) -> None: ex = result.exception if ex: # This only works on Python 3 diff --git a/tests/test_authentication.py b/tests/test_authentication.py index 023528be5..c42f0c3e8 100644 --- a/tests/test_authentication.py +++ b/tests/test_authentication.py @@ -17,13 +17,13 @@ class AuthenticationTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_failed_auth(self): + def test_failed_auth(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -37,14 +37,15 @@ def test_failed_auth(self): authenticator(setup_logger(), "com")( "bad_username", "bad_password", - cookie_directory=cookie_dir, - client_id="EC5646DE-9423-11E8-BF21-14109FE0B321", + cookie_dir, + False, + "EC5646DE-9423-11E8-BF21-14109FE0B321", ) self.assertTrue( "Invalid email/password combination." in str(context.exception)) - def test_2sa_required(self): + def test_2sa_required(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -60,9 +61,9 @@ def test_2sa_required(self): authenticator(setup_logger(), "com")( "jdoe@gmail.com", "password1", - raise_error_on_2sa=True, - cookie_directory=cookie_dir, - client_id="DE309E26-942E-11E8-92F5-14109FE0B321", + cookie_dir, + True, + "DE309E26-942E-11E8-92F5-14109FE0B321", ) self.assertTrue( @@ -70,7 +71,7 @@ def test_2sa_required(self): in str(context.exception) ) - def test_2fa_required(self): + def test_2fa_required(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -86,9 +87,9 @@ def test_2fa_required(self): authenticator(setup_logger(), "com")( "jdoe@gmail.com", "password1", - raise_error_on_2sa=True, - cookie_directory=cookie_dir, - client_id="EC5646DE-9423-11E8-BF21-14109FE0B321", + cookie_dir, + True, + "EC5646DE-9423-11E8-BF21-14109FE0B321", ) self.assertTrue( @@ -96,7 +97,7 @@ def test_2fa_required(self): in str(context.exception) ) - def test_successful_token_validation(self): + def test_successful_token_validation(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -144,7 +145,7 @@ def test_successful_token_validation(self): self.assertIn("INFO Authentication completed successfully", self._caplog.text) assert result.exit_code == 0 - def test_password_prompt(self): + def test_password_prompt(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") diff --git a/tests/test_autodelete_photos.py b/tests/test_autodelete_photos.py index 2ba1b5094..fde95a061 100644 --- a/tests/test_autodelete_photos.py +++ b/tests/test_autodelete_photos.py @@ -1,3 +1,5 @@ +import logging +from typing import Any, NoReturn, Optional from unittest import TestCase from icloudpd import constants from vcr import VCR @@ -23,13 +25,13 @@ class AutodeletePhotosTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_autodelete_invalid_creation_date(self): + def test_autodelete_invalid_creation_date(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -45,7 +47,7 @@ def test_autodelete_invalid_creation_date(self): # Can't mock `astimezone` because it's a readonly property, so have to # create a new class that inherits from datetime.datetime class NewDateTime(datetime.datetime): - def astimezone(self, tz=None): + def astimezone(self, _tz:(Optional[Any])=None) -> NoReturn: raise ValueError('Invalid date') dt_mock.return_value = NewDateTime(2018, 1, 1, 0, 0, 0) @@ -140,7 +142,7 @@ def astimezone(self, tz=None): assert not os.path.exists( os.path.join(data_dir, file_name)), f"{file_name} not expected, but present" - def test_download_autodelete_photos(self): + def test_download_autodelete_photos(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -238,7 +240,7 @@ def test_download_autodelete_photos(self): assert not os.path.exists(os.path.join( data_dir, file_name)), f"{file_name} not expected, but present" - def test_autodelete_photos(self): + def test_autodelete_photos(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -341,7 +343,7 @@ def test_autodelete_photos(self): assert not os.path.exists(os.path.join( data_dir, file_name)), f"{file_name} not expected, but present" - def test_retry_delete_after_download_session_error(self): + def test_retry_delete_after_download_session_error(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -355,11 +357,11 @@ def test_retry_delete_after_download_session_error(self): with vcr.use_cassette(os.path.join(self.vcr_path, "download_autodelete_photos.yml")): - def mock_raise_response_error(a0_, a1_, a2_): + def mock_raise_response_error(a0_:logging.Logger, a1_:PyiCloudService, a2_:PhotoAsset) -> None: if not hasattr(self, f"already_raised_session_exception{inspect.stack()[0][3]}"): setattr(self, f"already_raised_session_exception{inspect.stack()[0][3]}", True) raise PyiCloudAPIResponseException( - "Invalid global session", 100) + "Invalid global session", "100") with mock.patch("time.sleep") as sleep_mock: with mock.patch("icloudpd.base.delete_photo") as pa_delete: @@ -369,7 +371,7 @@ def mock_raise_response_error(a0_, a1_, a2_): # but do nothing on the second try. orig_authenticate = PyiCloudService.authenticate - def mocked_authenticate(self): + def mocked_authenticate(self:PyiCloudService) -> None: if not hasattr(self, f"already_authenticated{inspect.stack()[0][3]}"): orig_authenticate(self) setattr(self, f"already_authenticated{inspect.stack()[0][3]}", True) @@ -434,7 +436,7 @@ def mocked_authenticate(self): assert sum(1 for _ in files_in_result) == 1 - def test_retry_fail_delete_after_download_session_error(self): + def test_retry_fail_delete_after_download_session_error(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -448,8 +450,8 @@ def test_retry_fail_delete_after_download_session_error(self): with vcr.use_cassette(os.path.join(self.vcr_path, "download_autodelete_photos.yml")): - def mock_raise_response_error(a0_, a1_, a2_): - raise PyiCloudAPIResponseException("Invalid global session", 100) + def mock_raise_response_error(a0_:logging.Logger, a1_:PyiCloudService, a2_:PhotoAsset) -> None: + raise PyiCloudAPIResponseException("Invalid global session", "100") with mock.patch("time.sleep") as sleep_mock: with mock.patch("icloudpd.base.delete_photo") as pa_delete: @@ -459,7 +461,7 @@ def mock_raise_response_error(a0_, a1_, a2_): # but do nothing on the second try. orig_authenticate = PyiCloudService.authenticate - def mocked_authenticate(self): + def mocked_authenticate(self:PyiCloudService) -> None: if not hasattr(self, f"already_authenticated{inspect.stack()[0][3]}"): orig_authenticate(self) setattr(self, f"already_authenticated{inspect.stack()[0][3]}", True) @@ -524,7 +526,7 @@ def mocked_authenticate(self): assert sum(1 for _ in files_in_result) == 1 - def test_retry_delete_after_download_internal_error(self): + def test_retry_delete_after_download_internal_error(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -538,7 +540,7 @@ def test_retry_delete_after_download_internal_error(self): with vcr.use_cassette(os.path.join(self.vcr_path, "download_autodelete_photos.yml")): - def mock_raise_response_error(a0_, a1_, a2_): + def mock_raise_response_error(a0_:logging.Logger, a1_:PyiCloudService, a2_:PhotoAsset) -> None: if not hasattr(self, f"already_raised_session_exception{inspect.stack()[0][3]}"): setattr(self, f"already_raised_session_exception{inspect.stack()[0][3]}", True) raise PyiCloudAPIResponseException( @@ -605,7 +607,7 @@ def mock_raise_response_error(a0_, a1_, a2_): assert sum(1 for _ in files_in_result) == 1 - def test_retry_fail_delete_after_download_internal_error(self): + def test_retry_fail_delete_after_download_internal_error(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -619,7 +621,7 @@ def test_retry_fail_delete_after_download_internal_error(self): with vcr.use_cassette(os.path.join(self.vcr_path, "download_autodelete_photos.yml")): - def mock_raise_response_error(a0_, a1_, a2_): + def mock_raise_response_error(a0_:logging.Logger, a1_:PyiCloudService, a2_:PhotoAsset) -> None: raise PyiCloudAPIResponseException("INTERNAL_ERROR", "INTERNAL_ERROR") with mock.patch("time.sleep") as sleep_mock: @@ -683,7 +685,7 @@ def mock_raise_response_error(a0_, a1_, a2_): assert sum(1 for _ in files_in_result) == 1 - def test_autodelete_photos_dry_run(self): + def test_autodelete_photos_dry_run(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") diff --git a/tests/test_cli.py b/tests/test_cli.py index a115f3026..29d0d5165 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,4 +1,5 @@ # coding=utf-8 +from typing import Sequence, Tuple from unittest import TestCase import os import shutil @@ -16,18 +17,18 @@ class CliTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_cli(self): + def test_cli(self) -> None: runner = CliRunner() result = runner.invoke(main, ["--help"]) assert result.exit_code == 0 - def test_log_levels(self): + def test_log_levels(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -35,7 +36,7 @@ def test_log_levels(self): for dir in [base_dir, cookie_dir, data_dir]: recreate_path(dir) - parameters = [ + parameters: Sequence[Tuple[str, Sequence[str], Sequence[str]]] = [ ("debug", ["DEBUG", "INFO"], []), ("info", ["INFO"], ["DEBUG"]), ("error", [], ["DEBUG", "INFO"]), @@ -75,7 +76,7 @@ def test_log_levels(self): assert sum(1 for _ in files_in_result) == 0 - def test_tqdm(self): + def test_tqdm(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -112,7 +113,7 @@ def test_tqdm(self): assert sum(1 for _ in files_in_result) == 0 - def test_unicode_directory(self): + def test_unicode_directory(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -148,7 +149,7 @@ def test_unicode_directory(self): assert sum(1 for _ in files_in_result) == 0 - def test_missing_directory(self): + def test_missing_directory(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) # need path removed if os.path.exists(base_dir): @@ -176,7 +177,7 @@ def test_missing_directory(self): assert sum(1 for _ in files_in_result) == 0 - def test_missing_directory_param(self): + def test_missing_directory_param(self) -> None: runner = CliRunner() result = runner.invoke( main, @@ -193,7 +194,7 @@ def test_missing_directory_param(self): ) assert result.exit_code == 2 - def test_conflict_options_delete_after_download_and_auto_delete(self): + def test_conflict_options_delete_after_download_and_auto_delete(self) -> None: runner = CliRunner() result = runner.invoke( main, diff --git a/tests/test_download_live_photos.py b/tests/test_download_live_photos.py index 4b1b4c8c1..f99e2e8f0 100644 --- a/tests/test_download_live_photos.py +++ b/tests/test_download_live_photos.py @@ -23,13 +23,13 @@ class DownloadLivePhotoTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_skip_existing_downloads_for_live_photos(self): + def test_skip_existing_downloads_for_live_photos(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -61,7 +61,7 @@ def test_skip_existing_downloads_for_live_photos(self): "3", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -94,7 +94,7 @@ def test_skip_existing_downloads_for_live_photos(self): for file_name in files_to_download: assert os.path.exists(os.path.join(data_dir, os.path.normpath(file_name))), f"file {file_name} expected, but not found" - def test_skip_existing_live_photodownloads(self): + def test_skip_existing_live_photodownloads(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -137,7 +137,7 @@ def test_skip_existing_live_photodownloads(self): "3", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -182,7 +182,7 @@ def test_skip_existing_live_photodownloads(self): for file_name in files_to_download + ([file_name for (file_name, _) in files_to_create]): assert os.path.exists(os.path.join(data_dir, os.path.normpath(file_name))), f"file {file_name} expected, but not found" - def test_skip_existing_live_photo_print_filenames(self): + def test_skip_existing_live_photo_print_filenames(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -225,7 +225,7 @@ def test_skip_existing_live_photo_print_filenames(self): "3", "--no-progress-bar", "--threads-num", - 1, + "1", "--only-print-filenames", "-d", data_dir, diff --git a/tests/test_download_photos.py b/tests/test_download_photos.py index 5495b0bac..2aef38b1a 100644 --- a/tests/test_download_photos.py +++ b/tests/test_download_photos.py @@ -1,4 +1,7 @@ +import logging +from typing import Any, Callable, List, NoReturn, Optional, Sequence from unittest import TestCase +from requests import Response from vcr import VCR import os import sys @@ -25,13 +28,13 @@ class DownloadPhotoTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_download_and_skip_existing_photos(self): + def test_download_and_skip_existing_photos(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -72,7 +75,7 @@ def test_download_and_skip_existing_photos(self): "--set-exif-datetime", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -136,7 +139,7 @@ def test_download_and_skip_existing_photos(self): "2018-07-31 07:22:24", photo_modified_time.strftime('%Y-%m-%d %H:%M:%S')) - def test_download_photos_and_set_exif(self): + def test_download_photos_and_set_exif(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -165,9 +168,9 @@ def test_download_photos_and_set_exif(self): # Download the first photo, but mock the video download orig_download = PhotoAsset.download - def mocked_download(self, size): + def mocked_download(pa: PhotoAsset, size:str) -> Optional[Response]: if not hasattr(PhotoAsset, "already_downloaded"): - response = orig_download(self, size) + response = orig_download(pa, size) setattr(PhotoAsset, "already_downloaded", True) return response return mock.MagicMock() @@ -196,7 +199,7 @@ def mocked_download(self, size): # "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -239,7 +242,7 @@ def mocked_download(self, size): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_download_photos_and_get_exif_exceptions(self): + def test_download_photos_and_get_exif_exceptions(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -273,7 +276,7 @@ def test_download_photos_and_get_exif_exceptions(self): "--set-exif-datetime", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -314,7 +317,7 @@ def test_download_photos_and_get_exif_exceptions(self): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_skip_existing_downloads(self): + def test_skip_existing_downloads(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -327,7 +330,7 @@ def test_skip_existing_downloads(self): ("2018/07/31/IMG_7409.MOV", 3294075), ] - files_to_download = [ + files_to_download: List[str] = [ ] os.makedirs(os.path.join(data_dir, "2018/07/31/")) @@ -353,7 +356,7 @@ def test_skip_existing_downloads(self): # "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -392,7 +395,7 @@ def test_skip_existing_downloads(self): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_until_found(self): + def test_until_found(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -453,7 +456,7 @@ def test_until_found(self): "20", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -508,7 +511,7 @@ def test_until_found(self): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_handle_io_error(self): + def test_handle_io_error(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -537,7 +540,7 @@ def test_handle_io_error(self): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -566,7 +569,7 @@ def test_handle_io_error(self): assert sum(1 for _ in files_in_result) == 0 - def test_handle_session_error_during_download(self): + def test_handle_session_error_during_download(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -576,8 +579,8 @@ def test_handle_session_error_during_download(self): with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")): - def mock_raise_response_error(arg): - raise PyiCloudAPIResponseException("Invalid global session", 100) + def mock_raise_response_error(_arg: Any) -> NoReturn: + raise PyiCloudAPIResponseException("Invalid global session", "100") with mock.patch("time.sleep") as sleep_mock: with mock.patch.object(PhotoAsset, "download") as pa_download: @@ -587,7 +590,7 @@ def mock_raise_response_error(arg): # but do nothing on the second try. orig_authenticate = PyiCloudService.authenticate - def mocked_authenticate(self): + def mocked_authenticate(self: PyiCloudService) -> None: if not hasattr(self, "already_authenticated"): orig_authenticate(self) setattr(self, "already_authenticated", True) @@ -612,7 +615,7 @@ def mocked_authenticate(self): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -643,7 +646,7 @@ def mocked_authenticate(self): assert sum(1 for _ in files_in_result) == 0 - def test_handle_session_error_during_photo_iteration(self): + def test_handle_session_error_during_photo_iteration(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -653,8 +656,8 @@ def test_handle_session_error_during_photo_iteration(self): with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")): - def mock_raise_response_error(offset): - raise PyiCloudAPIResponseException("Invalid global session", 100) + def mock_raise_response_error(_offset: int) -> NoReturn: + raise PyiCloudAPIResponseException("Invalid global session", "100") with mock.patch("time.sleep") as sleep_mock: with mock.patch.object(PhotoAlbum, "photos_request") as pa_photos_request: @@ -664,7 +667,7 @@ def mock_raise_response_error(offset): # but do nothing on the second try. orig_authenticate = PyiCloudService.authenticate - def mocked_authenticate(self): + def mocked_authenticate(self: PyiCloudService) -> None: if not hasattr(self, "already_authenticated"): orig_authenticate(self) setattr(self, "already_authenticated", True) @@ -689,7 +692,7 @@ def mocked_authenticate(self): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -720,7 +723,7 @@ def mocked_authenticate(self): assert sum(1 for _ in files_in_result) == 0 - def test_handle_connection_error(self): + def test_handle_connection_error(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -731,7 +734,7 @@ def test_handle_connection_error(self): with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")): # Pass fixed client ID via environment variable - def mock_raise_response_error(arg): + def mock_raise_response_error(_arg: Any) -> NoReturn: raise ConnectionError("Connection Error") with mock.patch.object(PhotoAsset, "download") as pa_download: @@ -741,7 +744,7 @@ def mock_raise_response_error(arg): # but do nothing on the second try. orig_authenticate = PyiCloudService.authenticate - def mocked_authenticate(self): + def mocked_authenticate(self: PyiCloudService) -> None: if not hasattr(self, "already_authenticated"): orig_authenticate(self) setattr(self, "already_authenticated", True) @@ -766,7 +769,7 @@ def mocked_authenticate(self): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -794,7 +797,7 @@ def mocked_authenticate(self): assert sum(1 for _ in files_in_result) == 0 - def test_handle_albums_error(self): + def test_handle_albums_error(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -805,8 +808,8 @@ def test_handle_albums_error(self): with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")): # Pass fixed client ID via environment variable - def mock_raise_response_error(): - raise PyiCloudAPIResponseException("Api Error", 100) + def mock_raise_response_error() -> None: + raise PyiCloudAPIResponseException("Api Error", "100") with mock.patch.object(PhotoLibrary, "_fetch_folders") as pa_photos_request: pa_photos_request.side_effect = mock_raise_response_error @@ -815,7 +818,7 @@ def mock_raise_response_error(): # but do nothing on the second try. orig_authenticate = PyiCloudService.authenticate - def mocked_authenticate(self): + def mocked_authenticate(self: PyiCloudService) -> None: if not hasattr(self, "already_authenticated"): orig_authenticate(self) setattr(self, "already_authenticated", True) @@ -840,7 +843,7 @@ def mocked_authenticate(self): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -856,7 +859,7 @@ def mocked_authenticate(self): assert sum(1 for _ in files_in_result) == 0 - def test_missing_size(self): + def test_missing_size(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -883,7 +886,7 @@ def test_missing_size(self): "3", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -921,7 +924,7 @@ def test_missing_size(self): self.assertEqual(sum(1 for _ in files_in_result), 0, "Files in result") - def test_size_fallback_to_original(self): + def test_size_fallback_to_original(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -956,7 +959,7 @@ def test_size_fallback_to_original(self): "thumb", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -995,7 +998,7 @@ def test_size_fallback_to_original(self): assert sum(1 for _ in files_in_result) == 0 - def test_force_size(self): + def test_force_size(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1028,7 +1031,7 @@ def test_force_size(self): "--force-size", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -1061,7 +1064,7 @@ def test_force_size(self): assert sum(1 for _ in files_in_result) == 0 - def test_invalid_creation_date(self): + def test_invalid_creation_date(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1077,7 +1080,7 @@ def test_invalid_creation_date(self): # Can't mock `astimezone` because it's a readonly property, so have to # create a new class that inherits from datetime.datetime class NewDateTime(datetime.datetime): - def astimezone(self, tz=None): + def astimezone(self, _tz:(Optional[Any])=None) -> NoReturn: raise ValueError('Invalid date') dt_mock.return_value = NewDateTime(2018, 1, 1, 0, 0, 0) @@ -1098,7 +1101,7 @@ def astimezone(self, tz=None): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -1141,7 +1144,7 @@ def astimezone(self, tz=None): reason="does not run on windows") @pytest.mark.skipif(sys.platform == 'darwin', reason="does not run on mac") - def test_invalid_creation_year(self): + def test_invalid_creation_year(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1157,7 +1160,7 @@ def test_invalid_creation_year(self): # Can't mock `astimezone` because it's a readonly property, so have to # create a new class that inherits from datetime.datetime class NewDateTime(datetime.datetime): - def astimezone(self, tz=None): + def astimezone(self, _tz:(Optional[Any])=None) -> NoReturn: raise ValueError('Invalid date') dt_mock.return_value = NewDateTime(5, 1, 1, 0, 0, 0) @@ -1178,7 +1181,7 @@ def astimezone(self, tz=None): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -1217,7 +1220,7 @@ def astimezone(self, tz=None): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_unknown_item_type(self): + def test_unknown_item_type(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1247,7 +1250,7 @@ def test_unknown_item_type(self): "1", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -1280,7 +1283,7 @@ def test_unknown_item_type(self): assert sum(1 for _ in files_in_result) == 0 - def test_download_and_dedupe_existing_photos(self): + def test_download_and_dedupe_existing_photos(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1302,7 +1305,7 @@ def test_download_and_dedupe_existing_photos(self): # Download the first photo, but mock the video download orig_download = PhotoAsset.download - def mocked_download(self, size): + def mocked_download(self: PhotoAsset, size: str) -> Optional[Response]: if not hasattr(PhotoAsset, "already_downloaded"): response = orig_download(self, size) setattr(PhotoAsset, "already_downloaded", True) @@ -1400,7 +1403,7 @@ def mocked_download(self, size): assert result.exit_code == 0 - def test_download_photos_and_set_exif_exceptions(self): + def test_download_photos_and_set_exif_exceptions(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1437,7 +1440,7 @@ def test_download_photos_and_set_exif_exceptions(self): "--set-exif-datetime", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -1481,7 +1484,7 @@ def test_download_photos_and_set_exif_exceptions(self): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_download_chinese(self): + def test_download_chinese(self) -> None: base_dir = os.path.join( self.fixtures_path, inspect.stack()[0][3], "中文") cookie_dir = os.path.join(base_dir, "cookie") @@ -1513,7 +1516,7 @@ def test_download_chinese(self): "--set-exif-datetime", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -1563,7 +1566,7 @@ def test_download_chinese(self): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_download_after_delete(self): + def test_download_after_delete(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1599,7 +1602,7 @@ def test_download_after_delete(self): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "--delete-after-download", "-d", data_dir, @@ -1637,7 +1640,7 @@ def test_download_after_delete(self): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_download_after_delete_fail(self): + def test_download_after_delete_fail(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1663,7 +1666,7 @@ def test_download_after_delete_fail(self): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "--delete-after-download", "-d", data_dir, @@ -1697,7 +1700,7 @@ def test_download_after_delete_fail(self): assert sum(1 for _ in files_in_result) == 0 - def test_download_over_old_original_photos(self): + def test_download_over_old_original_photos(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1738,7 +1741,7 @@ def test_download_over_old_original_photos(self): "--set-exif-datetime", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -1805,7 +1808,7 @@ def test_download_over_old_original_photos(self): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_download_normalized_names(self): + def test_download_normalized_names(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1849,7 +1852,7 @@ def test_download_normalized_names(self): "--set-exif-datetime", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -1871,7 +1874,7 @@ def test_download_normalized_names(self): file_name))), f"File {file_name} expected, but does not exist" @pytest.mark.skip("not ready yet. may be not needed") - def test_download_watch(self): + def test_download_watch(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1893,19 +1896,19 @@ def test_download_watch(self): with open(os.path.join(data_dir, file_name), "a") as f: f.truncate(file_size) - def my_sleep(target_duration): - counter = 0 + # def my_sleep(_target_duration: int) -> Callable[[int], None]: + # counter: int = 0 - def sleep_(duration): - if counter > duration: - raise ValueError("SLEEP MOCK") - counter = counter + 1 - return sleep_ + # def sleep_(duration: int) -> None: + # if counter > duration: + # raise ValueError("SLEEP MOCK") + # counter = counter + 1 + # return sleep_ with mock.patch("time.sleep") as sleep_patched: # import random target_duration = 1 - sleep_patched.side_effect = my_sleep(target_duration) + # sleep_patched.side_effect = my_sleep(target_duration) with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")): # Pass fixed client ID via environment variable runner = CliRunner(env={ @@ -1925,11 +1928,11 @@ def sleep_(duration): "--set-exif-datetime", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--watch-with-interval", - target_duration, + str(target_duration), "--cookie-directory", cookie_dir, ], @@ -1948,7 +1951,7 @@ def sleep_(duration): assert os.path.exists(os.path.join(data_dir, os.path.normpath( file_name))), f"File {file_name} expected, but does not exist" - def test_handle_internal_error_during_download(self): + def test_handle_internal_error_during_download(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -1958,7 +1961,7 @@ def test_handle_internal_error_during_download(self): with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")): - def mock_raise_response_error(arg): + def mock_raise_response_error(_arg: Any) -> NoReturn: raise PyiCloudAPIResponseException( "INTERNAL_ERROR", "INTERNAL_ERROR") @@ -1983,7 +1986,7 @@ def mock_raise_response_error(arg): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -2013,7 +2016,7 @@ def mock_raise_response_error(arg): assert sum(1 for _ in files_in_result) == 0 - def test_handle_internal_error_during_photo_iteration(self): + def test_handle_internal_error_during_photo_iteration(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -2023,7 +2026,7 @@ def test_handle_internal_error_during_photo_iteration(self): with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")): - def mock_raise_response_error(offset): + def mock_raise_response_error(_offset: int) -> NoReturn: raise PyiCloudAPIResponseException( "INTERNAL_ERROR", "INTERNAL_ERROR") @@ -2048,7 +2051,7 @@ def mock_raise_response_error(offset): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -2079,7 +2082,7 @@ def mock_raise_response_error(offset): assert sum(1 for _ in files_in_result) == 0 - def test_handle_io_error_mkdir(self): + def test_handle_io_error_mkdir(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -2108,7 +2111,7 @@ def test_handle_io_error_mkdir(self): "--skip-live-photos", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -2135,7 +2138,7 @@ def test_handle_io_error_mkdir(self): self.assertEqual(sum(1 for _ in files_in_result), 0, "Files at the end") - def test_dry_run(self): + def test_dry_run(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -2169,7 +2172,7 @@ def test_dry_run(self): "--no-progress-bar", "--dry-run", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -2217,7 +2220,7 @@ def test_dry_run(self): self.assertEqual(sum(1 for _ in files_in_result), 0, "Files in the result") - def test_download_after_delete_dry_run(self): + def test_download_after_delete_dry_run(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -2225,7 +2228,7 @@ def test_download_after_delete_dry_run(self): for dir in [base_dir, cookie_dir, data_dir]: recreate_path(dir) - def raise_response_error(a0_, a1_, a2_): + def raise_response_error(a0_:logging.Logger, a1_:PyiCloudService, a2_: PhotoAsset) -> NoReturn: raise Exception("Unexpected call to delete_photo") with mock.patch.object(piexif, "insert") as piexif_patched: @@ -2258,7 +2261,7 @@ def raise_response_error(a0_, a1_, a2_): "--no-progress-bar", "--dry-run", "--threads-num", - 1, + "1", "--delete-after-download", "-d", data_dir, diff --git a/tests/test_email_notifications.py b/tests/test_email_notifications.py index 40c5d3157..95ee39413 100644 --- a/tests/test_email_notifications.py +++ b/tests/test_email_notifications.py @@ -16,14 +16,14 @@ class EmailNotificationsTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") @freeze_time("2018-01-01") - def test_2sa_required_email_notification(self): + def test_2sa_required_email_notification(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -77,7 +77,7 @@ def test_2sa_required_email_notification(self): ) @freeze_time("2018-01-01") - def test_2sa_notification_without_smtp_login_and_tls(self): + def test_2sa_notification_without_smtp_login_and_tls(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -126,7 +126,7 @@ def test_2sa_notification_without_smtp_login_and_tls(self): ) @freeze_time("2018-01-01") - def test_2sa_required_notification_script(self): + def test_2sa_required_notification_script(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -160,7 +160,7 @@ def test_2sa_required_notification_script(self): subprocess_patched.assert_called_once_with(["./test_script.sh"]) @freeze_time("2018-01-01") - def test_2sa_required_email_notification_from(self): + def test_2sa_required_email_notification_from(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") diff --git a/tests/test_folder_structure.py b/tests/test_folder_structure.py index 8e0725674..32df39ee9 100644 --- a/tests/test_folder_structure.py +++ b/tests/test_folder_structure.py @@ -1,3 +1,4 @@ +from typing import List from unittest import TestCase import os from os.path import normpath @@ -15,14 +16,14 @@ class FolderStructureTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") # This is basically a copy of the listing_recent_photos test # - def test_default_folder_structure(self): + def test_default_folder_structure(self) -> None: ### Tests if the default directory structure is constructed correctly ### base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -31,7 +32,7 @@ def test_default_folder_structure(self): for dir in [base_dir, cookie_dir, data_dir]: recreate_path(dir) - files_to_download = [ + files_to_download: List[str] = [ ] # Note - This test uses the same cassette as test_download_photos.py @@ -96,7 +97,7 @@ def test_default_folder_structure(self): assert os.path.exists(os.path.join(data_dir, os.path.normpath(file_name))), f"File {file_name} expected, but does not exist" - def test_folder_structure_none(self): + def test_folder_structure_none(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -104,7 +105,7 @@ def test_folder_structure_none(self): for dir in [base_dir, cookie_dir, data_dir]: recreate_path(dir) - files_to_download = [] + files_to_download: List[str] = [] # Note - This test uses the same cassette as test_download_photos.py with vcr.use_cassette(os.path.join(self.vcr_path, "listing_photos.yml")): diff --git a/tests/test_listing_albums.py b/tests/test_listing_albums.py index 7bd9c66fc..618d84fa4 100644 --- a/tests/test_listing_albums.py +++ b/tests/test_listing_albums.py @@ -17,13 +17,13 @@ class ListingAlbumsTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_listing_albums(self): + def test_listing_albums(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") diff --git a/tests/test_listing_libraries.py b/tests/test_listing_libraries.py index a3d13b89b..54b719ead 100644 --- a/tests/test_listing_libraries.py +++ b/tests/test_listing_libraries.py @@ -17,13 +17,13 @@ class ListingLibraryTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_listing_library(self): + def test_listing_library(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -57,7 +57,7 @@ def test_listing_library(self): assert result.exit_code == 0 - def test_listing_library_error(self): + def test_listing_library_error(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") diff --git a/tests/test_listing_recent_photos.py b/tests/test_listing_recent_photos.py index b5169a987..3215ce64b 100644 --- a/tests/test_listing_recent_photos.py +++ b/tests/test_listing_recent_photos.py @@ -16,13 +16,13 @@ class ListingRecentPhotosTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_listing_recent_photos(self): + def test_listing_recent_photos(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -48,7 +48,7 @@ def test_listing_recent_photos(self): "--only-print-filenames", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -90,7 +90,7 @@ def test_listing_recent_photos(self): assert sum(1 for _ in files_in_result) == 0 - def test_listing_photos_does_not_create_folders(self): + def test_listing_photos_does_not_create_folders(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -120,7 +120,7 @@ def test_listing_photos_does_not_create_folders(self): "--only-print-filenames", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -139,7 +139,7 @@ def test_listing_photos_does_not_create_folders(self): assert sum(1 for _ in files_in_result) == 0 - def test_listing_recent_photos_with_missing_filenameEnc(self): + def test_listing_recent_photos_with_missing_filenameEnc(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -167,7 +167,7 @@ def test_listing_recent_photos_with_missing_filenameEnc(self): "--only-print-filenames", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -176,7 +176,7 @@ def test_listing_recent_photos_with_missing_filenameEnc(self): ) print_result_exception(result) - self.assertEqual.__self__.maxDiff = None + self.assertEqual.__self__.maxDiff = None # type: ignore[attr-defined] filenames = result.output.splitlines() @@ -209,7 +209,7 @@ def test_listing_recent_photos_with_missing_filenameEnc(self): # This was used to solve the missing filenameEnc error. I found # another case where it might crash. (Maybe Apple changes the downloadURL key) - def test_listing_recent_photos_with_missing_downloadURL(self): + def test_listing_recent_photos_with_missing_downloadURL(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") data_dir = os.path.join(base_dir, "data") @@ -237,7 +237,7 @@ def test_listing_recent_photos_with_missing_downloadURL(self): "--only-print-filenames", "--no-progress-bar", "--threads-num", - 1, + "1", "-d", data_dir, "--cookie-directory", @@ -246,7 +246,7 @@ def test_listing_recent_photos_with_missing_downloadURL(self): ) print_result_exception(result) - self.assertEqual.__self__.maxDiff = None + self.assertEqual.__self__.maxDiff = None # type: ignore[attr-defined] self.assertEqual("""\ KeyError: 'downloadURL' attribute was not found in the photo fields. icloudpd has saved the photo record to: ./icloudpd-photo-error.json diff --git a/tests/test_logger.py b/tests/test_logger.py index 0cc064949..23a8cc87f 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -12,7 +12,7 @@ class LoggerTestCase(TestCase): # Tests the formatter that is set up in setup_logger() @freeze_time("2018-01-01 00:00:00-0000") - def test_logger_output(self): + def test_logger_output(self) -> None: logger = setup_logger() test_logger = logging.getLogger("icloudpd-test") string_io = StringIO() @@ -31,25 +31,25 @@ def test_logger_output(self): self.assertIn("DEBUG Test debug output", output) self.assertIn("ERROR Test error output", output) - def test_logger_tqdm_fallback(self): + def test_logger_tqdm_fallback(self) -> None: logging.setLoggerClass(IPDLogger) logger = logging.getLogger("icloudpd-test") - logger.log = MagicMock() - logger.set_tqdm_description("foo") + logger.log = MagicMock() # type: ignore[assignment] + logger.set_tqdm_description("foo") #type: ignore[attr-defined] logger.log.assert_called_once_with(logging.INFO, "foo") - logger.log = MagicMock() - logger.tqdm_write("bar") + logger.log = MagicMock() #type: ignore[method-assign] + logger.tqdm_write("bar")#type: ignore[attr-defined] logger.log.assert_called_once_with(logging.INFO, "bar") - logger.set_tqdm(MagicMock()) - logger.tqdm.write = MagicMock() - logger.tqdm.set_description = MagicMock() - logger.log = MagicMock() - logger.set_tqdm_description("baz") - logger.tqdm.set_description.assert_called_once_with("baz") - logger.tqdm_write("qux") - logger.tqdm.write.assert_called_once_with("qux") + logger.set_tqdm(MagicMock())#type: ignore[attr-defined] + logger.tqdm.write = MagicMock()#type: ignore[attr-defined] + logger.tqdm.set_description = MagicMock()#type: ignore[attr-defined] + logger.log = MagicMock() #type: ignore[method-assign] + logger.set_tqdm_description("baz")#type: ignore[attr-defined] + logger.tqdm.set_description.assert_called_once_with("baz")#type: ignore[attr-defined] + logger.tqdm_write("qux")#type: ignore[attr-defined] + logger.tqdm.write.assert_called_once_with("qux")#type: ignore[attr-defined] logger.log.assert_not_called - logger.set_tqdm(None) + logger.set_tqdm(None)#type: ignore[attr-defined] diff --git a/tests/test_string_helpers.py b/tests/test_string_helpers.py index b8d414f82..d92c91500 100644 --- a/tests/test_string_helpers.py +++ b/tests/test_string_helpers.py @@ -2,7 +2,7 @@ from icloudpd.string_helpers import truncate_middle class TruncateMiddleTestCase(TestCase): - def test_truncate_middle(self): + def test_truncate_middle(self) -> None: assert truncate_middle("test_filename.jpg", 50) == "test_filename.jpg" assert truncate_middle("test_filename.jpg", 17) == "test_filename.jpg" assert truncate_middle("test_filename.jpg", 16) == "test_f...me.jpg" diff --git a/tests/test_two_step_auth.py b/tests/test_two_step_auth.py index 37ba25943..0484fb03a 100644 --- a/tests/test_two_step_auth.py +++ b/tests/test_two_step_auth.py @@ -18,13 +18,13 @@ class TwoStepAuthTestCase(TestCase): @pytest.fixture(autouse=True) - def inject_fixtures(self, caplog): + def inject_fixtures(self, caplog: pytest.LogCaptureFixture) -> None: self._caplog = caplog self.root_path = path_from_project_root(__file__) self.fixtures_path = os.path.join(self.root_path, "fixtures") self.vcr_path = os.path.join(self.root_path, "vcr_cassettes") - def test_2sa_flow_invalid_code(self): + def test_2sa_flow_invalid_code(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -56,7 +56,7 @@ def test_2sa_flow_invalid_code(self): assert result.exit_code == 1 - def test_2sa_flow_valid_code(self): + def test_2sa_flow_valid_code(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -98,7 +98,7 @@ def test_2sa_flow_valid_code(self): ) assert result.exit_code == 0 - def test_2sa_flow_failed_send_code(self): + def test_2sa_flow_failed_send_code(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -140,7 +140,7 @@ def test_2sa_flow_failed_send_code(self): ) assert result.exit_code == 1 - def test_2fa_flow_invalid_code(self): + def test_2fa_flow_invalid_code(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie") @@ -172,7 +172,7 @@ def test_2fa_flow_invalid_code(self): assert result.exit_code == 1 - def test_2fa_flow_valid_code(self): + def test_2fa_flow_valid_code(self) -> None: base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3]) cookie_dir = os.path.join(base_dir, "cookie")