Skip to content

Commit

Permalink
Authentication Fix: Merge updated auth flow from pyicloud (#734)
Browse files Browse the repository at this point in the history
* Merged updated auth flow from pyicloud - including 2FA. Retain Library and Domain support.

* Update authentication.py to use 2FA

* Fix AttributeError trying to use 'get' on Response object

* Cleanup unused imports.  Updated a couple incorrect references from 'apple_id' to 'accountName'

* Tests: Updated 2 VCR's for better underatanding of further necessary updates.

* icloudpd: Fix: No password provided or in keyring now prompts correctly
icloudpd: Refactor: 2SA prompt requires device selection to send code
icloudpd: Feat: New --auth-only flag to trigger log in, 2SA/2FA, and set session/cookie file.
              Future log in will validate the tokens without running through full signin flow.
              Can be used to validate the session tokens are still good without having to ping
              the photo endpoints.

pyicloud_ipd: Clean: Removed unused imports
pyicloud_ipd: Fix: Capture additional header data
pyicloud_ipd: Fix: Invalid Username/Password correctly caught now
pyicloud_ipd: Fix: Changes in certain error responses now captured
pyicloud_ipd: Fix: Bypass 2sv/trust when using 2SA

Tests: Refactored authentication tests
Tests: Refactored two_step_auth tests (TODO: Add 2FA tests)
Tests: Updated/Created additional VCRs for auth tests

* Tests: authentication and two_step_auth tests now pass

* icloudpd: Fix: Correct exception reference for API error
pyicloud_ipd: Fix: Correct exception reference for API and NoStoredPassword errors

Tests: Refactor: All remaining tests now pass
Tests: Refactor: Update corresponding VCRs for new auth flow
Tests: Cookie/Session files stored in individual test fixtures for running tests independently

* icloudpd: Fix: Update exception reference
icloudpd: Style: Format update (scripts/format)

* fix: Update pyicloud_ipd/cmdline.py to use 2FA (in addition to 2SA)
docs: Update CHANGELOG.md and README.md
  • Loading branch information
scaraebeus authored Dec 19, 2023
1 parent a3d351f commit 87d3394
Show file tree
Hide file tree
Showing 42 changed files with 5,045 additions and 2,652 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
## Unreleased

- fix: macos binary failing [#668](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/668) [#700](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/700)
- fix: 'Invalid email/password combination' exception due to recent iCloud changes [#729](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/729)
- feature: `--auth-only` parameter to independently create/validate session tokens without listing/downloading photos
- feature: 2FA validation merged from `pyicloud`

## 1.16.3 (2023-12-04)

Expand Down
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ There are three ways to run `icloudpd`:
- One time download and an option to monitor for iCloud changes continuously (`--watch-with-interval` option)
- Optimizations for incremental runs (`--until-found` and `--recent` options)
- Photo meta data (EXIF) updates (`--set-exif-datetime` option)
- ... and many (use `--help` option to get full list)
- ... and many more (use `--help` option to get full list)

## Experimental Mode

Expand All @@ -39,7 +39,15 @@ To keep your iCloud photo collection synchronized to your local system:
icloudpd --directory /data --username [email protected] --watch-with-interval 3600
```

Synchronization logic can be adjusted with command-line parameters. Run `icloudpd --help` to get full list.
- Synchronization logic can be adjusted with command-line parameters. Run `icloudpd --help` to get full list.

To independently create and authorize a session (and complete 2SA/2FA validation if needed) on your local system:

```
icloudpd --username [email protected] --password my_password --auth-only
```

- This feature can also be used to check and verify that the session is still authenticated.

## FAQ

Expand Down
48 changes: 32 additions & 16 deletions src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,26 @@ def authenticate_(
client_id=client_id,
)
break
except pyicloud_ipd.exceptions.NoStoredPasswordAvailable:
except pyicloud_ipd.exceptions.PyiCloudNoStoredPasswordAvailableException:
# Prompt for password if not stored in PyiCloud's keyring
password = click.prompt("iCloud Password", hide_input=True)

if icloud.requires_2sa:
if icloud.requires_2fa:
if raise_error_on_2sa:
raise TwoStepAuthRequiredError(
"Two-step/two-factor authentication is required"
)
logger.info("Two-step/two-factor authentication is required")
logger.info("Two-step/two-factor authentication is required (2fa)")
request_2fa(icloud, logger)

elif icloud.requires_2sa:
if raise_error_on_2sa:
raise TwoStepAuthRequiredError(
"Two-step/two-factor authentication is required"
)
logger.info("Two-step/two-factor authentication is required (2sa)")
request_2sa(icloud, logger)

return icloud
return authenticate_

Expand All @@ -65,25 +74,17 @@ def request_2sa(icloud: pyicloud_ipd.PyiCloudService, logger: logging.Logger):
device.get("phoneNumber"))))
# pylint: enable-msg=consider-using-f-string

# pylint: disable-msg=superfluous-parens
print(f" {devices_count}: Enter two-factor authentication code")
# pylint: enable-msg=superfluous-parens
device_index = click.prompt(
"Please choose an option:",
default=0,
type=click.IntRange(
0,
devices_count))
devices_count - 1))

if device_index == devices_count:
# We're using the 2FA code that was automatically sent to the user's device,
# so can just use an empty dict()
device = {}
else:
device = devices[device_index]
if not icloud.send_verification_code(device):
logger.error("Failed to send two-factor authentication code")
sys.exit(1)
device = devices[device_index]
if not icloud.send_verification_code(device):
logger.error("Failed to send two-factor authentication code")
sys.exit(1)

code = click.prompt("Please enter two-factor authentication code")
if not icloud.validate_verification_code(device, code):
Expand All @@ -96,3 +97,18 @@ def request_2sa(icloud: pyicloud_ipd.PyiCloudService, logger: logging.Logger):
"the two-step authentication expires.\n"
"(Use --help to view information about SMTP options.)"
)


def request_2fa(icloud: pyicloud_ipd.PyiCloudService, logger: logging.Logger):
"""Request two-factor authentication."""
code = click.prompt("Please enter two-factor authentication code")
if not icloud.validate_2fa_code(code):
logger.error("Failed to verify two-factor authentication code")
sys.exit(1)
logger.info(
"Great, you're all set up. The script can now be run without "
"user interaction until 2SA expires.\n"
"You can set up email notifications for when "
"the two-step authentication expires.\n"
"(Use --help to view information about SMTP options.)"
)
88 changes: 52 additions & 36 deletions src/icloudpd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from tzlocal import get_localzone
from pyicloud_ipd import PyiCloudService

from pyicloud_ipd.exceptions import PyiCloudAPIResponseError
from pyicloud_ipd.exceptions import PyiCloudAPIResponseException
from pyicloud_ipd.services.photos import PhotoAsset

from icloudpd.authentication import authenticator, TwoStepAuthRequiredError
Expand Down Expand Up @@ -55,6 +55,11 @@
"(default: use PyiCloud keyring or prompt for password)",
metavar="<password>",
)
@click.option(
"--auth-only",
help="Create/Update cookie and session tokens only.",
is_flag=True,
)
@click.option(
"--cookie-directory",
help="Directory to store cookies for authentication "
Expand Down Expand Up @@ -136,13 +141,12 @@
+ "(Does not download or delete any files.)",
is_flag=True,
)
@click.option(
"--folder-structure",
help="Folder structure (default: {:%Y/%m/%d}). "
"If set to 'none' all photos will just be placed into the download directory",
metavar="<folder_structure>",
default="{:%Y/%m/%d}",
)
@click.option("--folder-structure",
help="Folder structure (default: {:%Y/%m/%d}). "
"If set to 'none' all photos will just be placed into the download directory",
metavar="<folder_structure>",
default="{:%Y/%m/%d}",
)
@click.option(
"--set-exif-datetime",
help="Write the DateTimeOriginal exif tag from file creation date, " +
Expand Down Expand Up @@ -235,14 +239,16 @@
is_flag=True,
default=False,
)
# a hacky way to get proper version because automatic detection does not work for some reason
# a hacky way to get proper version because automatic detection does not
# work for some reason
@click.version_option(version="1.16.3")
# pylint: disable-msg=too-many-arguments,too-many-statements
# pylint: disable-msg=too-many-branches,too-many-locals
def main(
directory: Optional[str],
username: Optional[str],
password: Optional[str],
auth_only: bool,
cookie_directory: str,
size: str,
live_photo_size: str,
Expand Down Expand Up @@ -299,15 +305,17 @@ def main(
with logging_redirect_tqdm():

# check required directory param only if not list albums
if not list_albums and not list_libraries and not directory:
print('--directory, --list-libraries or --list-albums are required')
if not list_albums and not list_libraries and not directory and not auth_only:
print(
'--auth-only, --directory, --list-libraries or --list-albums are required')
sys.exit(2)

if auto_delete and delete_after_download:
print('--auto-delete and --delete-after-download are mutually exclusive')
sys.exit(2)

if watch_with_interval and (list_albums or only_print_filenames): # pragma: no cover
if watch_with_interval and (
list_albums or only_print_filenames): # pragma: no cover
print(
'--watch_with_interval is not compatible with --list_albums, --only_print_filenames'
)
Expand All @@ -326,10 +334,13 @@ def main(
set_exif_datetime,
skip_live_photos,
live_photo_size,
dry_run) if directory is not None else (lambda _s: lambda _c, _p: False),
dry_run) if directory is not None else (
lambda _s: lambda _c,
_p: False),
directory,
username,
password,
auth_only,
cookie_directory,
size,
recent,
Expand All @@ -355,9 +366,7 @@ def main(
domain,
logger,
watch_with_interval,
dry_run
)
)
dry_run))


# pylint: disable-msg=too-many-arguments,too-many-statements
Expand All @@ -377,7 +386,8 @@ def download_builder(
live_photo_size: str,
dry_run: bool) -> Callable[[PyiCloudService], Callable[[Counter, PhotoAsset], bool]]:
"""factory for downloader"""
def state_(icloud: PyiCloudService) -> Callable[[Counter, PhotoAsset], bool]:
def state_(
icloud: PyiCloudService) -> Callable[[Counter, PhotoAsset], bool]:
def download_photo_(counter: Counter, photo: PhotoAsset) -> bool:
"""internal function for actually downloading the photos"""
filename = clean_filename(photo.filename)
Expand Down Expand Up @@ -507,17 +517,14 @@ def download_photo_(counter: Counter, photo: PhotoAsset) -> bool:
)

download_result = download.download_media(
logger, dry_run, icloud, photo, download_path, download_size
)
logger, dry_run, icloud, photo, download_path, download_size)
success = download_result

if download_result:
if not dry_run and \
set_exif_datetime and \
clean_filename(photo.filename) \
.lower() \
.endswith((".jpg", ".jpeg")) and \
not exif_datetime.get_photo_exif(logger, download_path):
if not dry_run and set_exif_datetime and clean_filename(
photo.filename) .lower() .endswith(
(".jpg", ".jpeg")) and not exif_datetime.get_photo_exif(
logger, download_path):
# %Y:%m:%d looks wrong, but it's the correct format
date_str = created_date.strftime(
"%Y-%m-%d %H:%M:%S%z")
Expand Down Expand Up @@ -582,8 +589,7 @@ def download_photo_(counter: Counter, photo: PhotoAsset) -> bool:
truncated_path
)
download_result = download.download_media(
logger, dry_run, icloud, photo, lp_download_path, lp_size
)
logger, dry_run, icloud, photo, lp_download_path, lp_size)
success = download_result and success
if download_result:
logger.info(
Expand All @@ -595,7 +601,10 @@ def download_photo_(counter: Counter, photo: PhotoAsset) -> bool:
return state_


def delete_photo(logger: logging.Logger, icloud: PyiCloudService, photo: PhotoAsset):
def delete_photo(
logger: logging.Logger,
icloud: PyiCloudService,
photo: PhotoAsset):
"""Delete a photo from the iCloud account."""
clean_filename_local = clean_filename(photo.filename)
logger.debug(
Expand Down Expand Up @@ -626,7 +635,10 @@ def delete_photo(logger: logging.Logger, icloud: PyiCloudService, photo: PhotoAs
"Deleted %s in iCloud", clean_filename_local)


def delete_photo_dry_run(logger: logging.Logger, _icloud: PyiCloudService, photo: PhotoAsset):
def delete_photo_dry_run(
logger: logging.Logger,
_icloud: PyiCloudService,
photo: PhotoAsset):
"""Dry run for deleting a photo from the iCloud"""
logger.info(
"[DRY RUN] Would delete %s in iCloud",
Expand Down Expand Up @@ -706,6 +718,7 @@ def core(
directory: Optional[str],
username: Optional[str],
password: Optional[str],
auth_only: bool,
cookie_directory: str,
size: str,
recent: Optional[int],
Expand Down Expand Up @@ -764,6 +777,10 @@ def core(
)
return 1

if auth_only:
logger.info("Authentication completed successfully")
return 0

download_photo = downloader(icloud)

# Access to the selected library. Defaults to the primary photos object.
Expand All @@ -788,7 +805,7 @@ def core(
logger.error("Unknown library: %s", library)
return 1
photos = library_object.albums[album]
except PyiCloudAPIResponseError as err:
except PyiCloudAPIResponseException as err:
# For later: come up with a nicer message to the user. For now take the
# exception text
logger.error("error?? %s", err)
Expand Down Expand Up @@ -816,8 +833,8 @@ def core(
logger, icloud)
internal_error_handler = internal_error_handle_builder(logger)

error_handler = compose_handlers([session_exception_handler, internal_error_handler
])
error_handler = compose_handlers(
[session_exception_handler, internal_error_handler])

photos.exception_handler = error_handler

Expand Down Expand Up @@ -863,7 +880,7 @@ def core(
video_suffix = " and videos" if not skip_videos else ""
logger.info(
("Downloading %s %s" +
" photo%s%s to %s ..."),
" photo%s%s to %s ..."),
photos_count_str,
size,
plural_suffix,
Expand All @@ -883,8 +900,7 @@ def should_break(counter: Counter) -> bool:
if should_break(consecutive_files_found):
logger.info(
"Found %s consecutive previously downloaded photos. Exiting",
until_found
)
until_found)
break
item = next(photos_iterator)
if download_photo(
Expand All @@ -907,7 +923,7 @@ def delete_cmd():

if auto_delete:
autodelete_photos(logger, dry_run, library_object,
folder_structure, directory)
folder_structure, directory)

if watch_interval: # pragma: no cover
logger.info(f"Waiting for {watch_interval} sec...")
Expand Down
10 changes: 6 additions & 4 deletions src/icloudpd/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import time
import datetime
from tzlocal import get_localzone
from requests.exceptions import ConnectionError # pylint: disable=redefined-builtin
from requests.exceptions import ConnectionError # pylint: disable=redefined-builtin
import pyicloud_ipd # pylint: disable=redefined-builtin
from pyicloud_ipd.exceptions import PyiCloudAPIResponseError
from pyicloud_ipd.exceptions import PyiCloudAPIResponseException
from pyicloud_ipd.services.photos import PhotoAsset

# Import the constants object so that we can mock WAIT_SECONDS in tests
Expand Down Expand Up @@ -52,7 +52,9 @@ def mkdirs_for_path(logger: logging.Logger, download_path: str) -> bool:
return False


def mkdirs_for_path_dry_run(logger: logging.Logger, download_path: str) -> bool:
def mkdirs_for_path_dry_run(
logger: logging.Logger,
download_path: str) -> bool:
""" DRY Run for Creating hierarchy of folders for file path """
download_dir = os.path.dirname(download_path)
if not os.path.exists(download_dir):
Expand Down Expand Up @@ -123,7 +125,7 @@ def download_media(
)
break

except (ConnectionError, socket.timeout, PyiCloudAPIResponseError) as ex:
except (ConnectionError, socket.timeout, PyiCloudAPIResponseException) as ex:
if "Invalid global session" in str(ex):
logger.error(
"Session error, re-authenticating...")
Expand Down
2 changes: 1 addition & 1 deletion src/icloudpd/email_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def send_2sa_notification(
smtp_port: int,
smtp_no_tls: bool,
to_addr: Optional[str],
from_addr: Optional[str]=None):
from_addr: Optional[str] = None):
"""Send an email notification when 2SA is expired"""
to_addr = cast(str, to_addr if to_addr is not None else smtp_email)
from_addr = from_addr if from_addr is not None else (
Expand Down
Loading

0 comments on commit 87d3394

Please sign in to comment.