Skip to content

Commit

Permalink
use new 2fa api for sms #803 (#878)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyNikiforov authored Jun 29, 2024
1 parent 356ecb5 commit 482119f
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 36 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- fix: match SMS MFA to icloud.com behavior [#803](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/803)

## 1.20.1 (2024-06-18)

- fix: keyring handling in `icloud` [#871](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/871)
Expand Down
30 changes: 11 additions & 19 deletions src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,43 +124,35 @@ def request_2sa(icloud: PyiCloudService, logger: logging.Logger) -> None:

def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
"""Request two-factor authentication."""
try:
devices = icloud.trusted_devices
except:
devices = []
if len(devices) > 0:
if len(devices) > 99:
devices = icloud.get_trusted_phone_numbers()
devices_count = len(devices)
if devices_count > 0:
if devices_count > 99:
logger.error("Too many trusted devices for authentication")
sys.exit(1)

for i, device in enumerate(devices):
# pylint: disable-msg=consider-using-f-string
print(
" %s: %s" %
(i, device.get(
"deviceName", "SMS to %s" %
device.get("phoneNumber"))))
# pylint: enable-msg=consider-using-f-string
print(f" {i}: {device.obfuscated_number}")

index_str = f"..{len(devices) - 1}" if len(devices) > 1 else ""
index_str = f"..{devices_count - 1}" if devices_count > 1 else ""
code:int = click.prompt(
f"Please enter two-factor authentication code or device index (0{index_str}) to send SMS with a code",
type=click.IntRange(
0,
999999))

if code < 100:
if code < devices_count:
# need to send code
device = devices[code]
if not icloud.send_verification_code(device):
if not icloud.send_2fa_code_sms(device.id):
logger.error("Failed to send two-factor authentication code")
sys.exit(1)
code = click.prompt(
"Please enter two-factor authentication code that you received over SMS",
type=click.IntRange(
0,
100000,
999999))
if not icloud.validate_verification_code(device, str(code)):
if not icloud.validate_2fa_code_sms(device.id, code):
logger.error("Failed to verify two-factor authentication code")
sys.exit(1)
else:
Expand All @@ -171,7 +163,7 @@ def request_2fa(icloud: PyiCloudService, logger: logging.Logger) -> None:
code = click.prompt(
"Please enter two-factor authentication code",
type=click.IntRange(
0,
100000,
999999))
if not icloud.validate_2fa_code(str(code)):
logger.error("Failed to verify two-factor authentication code")
Expand Down
73 changes: 71 additions & 2 deletions src/pyicloud_ipd/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import sys
from typing import Any, Callable, Dict, Optional, Sequence
from typing import Any, Callable, Dict, NamedTuple, Optional, Sequence
import typing
from uuid import uuid1
import json
Expand All @@ -10,6 +10,8 @@
import http.cookiejar as cookielib
import getpass

from requests import PreparedRequest, Request, Response

from pyicloud_ipd.exceptions import (
PyiCloudConnectionException,
PyiCloudFailedLoginException,
Expand All @@ -26,7 +28,7 @@
from pyicloud_ipd.services.photos import PhotosService
from pyicloud_ipd.services.account import AccountService
from pyicloud_ipd.session import PyiCloudPasswordFilter, PyiCloudSession
from pyicloud_ipd.utils import get_password_from_keyring
from pyicloud_ipd.sms import AuthenticatedSession, TrustedDevice, build_send_sms_code_request, build_trusted_phone_numbers_request, build_verify_sms_code_request, parse_trusted_phone_numbers_response


LOGGER = logging.getLogger(__name__)
Expand All @@ -42,6 +44,12 @@
"scnt": "scnt",
}

class TrustedPhoneContextProvider(NamedTuple):
domain: str
oauth_session: AuthenticatedSession



class PyiCloudService:
"""
A base authentication class for the iCloud service. Handles the
Expand Down Expand Up @@ -87,6 +95,7 @@ def __init__(
else:
raise NotImplementedError(f"Domain '{domain}' is not supported yet")

self.domain = domain

if cookie_directory:
self._cookie_directory = path.expanduser(
Expand Down Expand Up @@ -338,6 +347,48 @@ def trusted_devices(self) -> Sequence[Dict[str, Any]]:
return devices
return []

def send_request(self, request: PreparedRequest) -> Response:
return self.session.send(request)

def get_oauth_session(self) -> AuthenticatedSession:
return AuthenticatedSession(client_id = self.client_id, scnt = self.session_data["scnt"], session_id = self.session_data["session_id"])

def get_trusted_phone_numbers(self) -> Sequence[TrustedDevice]:
""" Returns list of trusted phone number for sms 2fa """

oauth_session = self.get_oauth_session()
context = TrustedPhoneContextProvider(domain = self.domain, oauth_session=oauth_session)

req = build_trusted_phone_numbers_request(context)
request = Request(
method = req.method,
url = req.url,
headers= req.headers
).prepare()

response = self.send_request(request)

return parse_trusted_phone_numbers_response(response)

def send_2fa_code_sms(self, device_id: int) -> bool:
""" Requests that a verification code is sent to the given device"""

oauth_session = self.get_oauth_session()
context = TrustedPhoneContextProvider(domain = self.domain, oauth_session=oauth_session)

req = build_send_sms_code_request(context, device_id)
request = Request(
method = req.method,
url = req.url,
headers= req.headers,
data = req.data,
json = req.json,
).prepare()

response = self.send_request(request)

return response.ok

def send_verification_code(self, device: Dict[str, Any]) -> bool:
""" Requests that a verification code is sent to the given device"""
data = json.dumps(device)
Expand Down Expand Up @@ -372,6 +423,24 @@ def validate_verification_code(self, device: Dict[str, Any], code: str) -> bool:

return not self.requires_2sa

def validate_2fa_code_sms(self, device_id: int, code:int) -> bool:
"""Verifies a verification code received via Apple's 2FA system through SMS."""

oauth_session = self.get_oauth_session()
context = TrustedPhoneContextProvider(domain = self.domain, oauth_session=oauth_session)

req = build_verify_sms_code_request(context, device_id, code)
request = Request(
method = req.method,
url = req.url,
headers= req.headers,
data = req.data,
json = req.json,
).prepare()
response = self.send_request(request)

return response.ok

def validate_2fa_code(self, code:str) -> bool:
"""Verifies a verification code received via Apple's 2FA system (HSA2)."""
data = {"securityCode": {"code": code}}
Expand Down
172 changes: 172 additions & 0 deletions src/pyicloud_ipd/sms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from html.parser import HTMLParser
import json
from typing import Any, List, Mapping, NamedTuple, Optional, Protocol, Sequence, Tuple

class _SMSParser(HTMLParser):
def __init__(self) -> None:
# initialize the base class
super(_SMSParser, self).__init__()
self._is_boot_args = False
self.sms_data: Mapping[str, Any] = {}

def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
if tag == "script":
self._is_boot_args = ("type", "application/json") in attrs and ("class", "boot_args") in attrs

def handle_endtag(self, tag: str) -> None:
if tag == "script":
self._is_boot_args = False

def handle_data(self, data: str) -> None:
if self._is_boot_args:
self.sms_data = json.loads(data)

class TrustedDevice(Protocol):
@property
def id(self) -> int: ...
@property
def obfuscated_number(self) -> str: ...

class _InternalTrustedDevice(NamedTuple):
id: int
obfuscated_number: str

def _map_to_trusted_device(device: Mapping[str, Any]) -> Optional[TrustedDevice]:
id: Optional[int] = device.get("id")
number: Optional[str] = device.get("obfuscatedNumber")
if id is None or number is None:
return None
return _InternalTrustedDevice(id=id, obfuscated_number=number)

class _Response(Protocol):
@property
def status_code(self) -> int: ...
@property
def text(self) -> str: ...

def parse_trusted_phone_numbers_response(response: _Response) -> Sequence[TrustedDevice]:
""" Parses html response for the list of available trusted phone numbers"""
if response.status_code in [200, 204]:
return parse_trusted_phone_numbers_payload(response.text)
return []

def parse_trusted_phone_numbers_payload(content: str) -> Sequence[TrustedDevice]:
""" Parses html response for the list of available trusted phone numbers"""
parser = _SMSParser()
parser.feed(content)
parser.close()
numbers: Sequence[Mapping[str, Any]] = parser.sms_data.get("direct", {}).get("twoSV", {}).get("phoneNumberVerification", {}).get("trustedPhoneNumbers", [])
return list((item for item in map(_map_to_trusted_device, numbers) if item is not None))

class AuthenticatedSession(NamedTuple):
client_id: str
scnt: str
session_id: str

def _oauth_const_headers() -> Mapping[str, str]:
return {
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
"X-Apple-OAuth-Require-Grant-Code": "true",
"X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
}

def _oauth_redirect_header(domain: str) -> Mapping[str, str]:
return {
"X-Apple-OAuth-Redirect-URI": "https://www.icloud.com.cn" if domain == "cn" else "https://www.icloud.com",
}

def _oauth_headers(auth_session: AuthenticatedSession) -> Mapping[str, str]:
""" Headers with OAuth session """

return {
"X-Apple-OAuth-State": auth_session.client_id,
"scnt": auth_session.scnt,
"X-Apple-ID-Session-Id": auth_session.session_id
}

def _auth_url(domain: str) -> str:
return "https://idmsa.apple.com.cn/appleauth/auth" if domain == "cn" else "https://idmsa.apple.com/appleauth/auth"

class _DomainProvider(Protocol):
@property
def domain(self) -> str: ...

class _OAuthSessionProvider(Protocol):
@property
def oauth_session(self) -> AuthenticatedSession: ...

class _TrustedPhoneContextProvider(_DomainProvider, _OAuthSessionProvider, Protocol): ...

class Request(Protocol):
@property
def method(self) -> str: ...
@property
def url(self) -> str: ...
@property
def headers(self) -> Mapping[str, str]: ...
@property
def data(self) -> Optional[str]: ...
@property
def json(self) -> Optional[Mapping[str, Any]]: ...

class _InternalRequest(NamedTuple):
method: str
url: str
headers: Mapping[str, str]
data: Optional[str] = None
json: Optional[Mapping[str, Any]] = None

def build_trusted_phone_numbers_request(context: _TrustedPhoneContextProvider) -> Request:
""" Builds a request for the list of trusted phone numbers for sms 2fa """

url = _auth_url(context.domain)

req = _InternalRequest(
method="GET",
url=url,
headers = {
**_oauth_const_headers(),
**_oauth_redirect_header(context.domain),
**_oauth_headers(context.oauth_session),
})
return req

def build_send_sms_code_request(context: _TrustedPhoneContextProvider, device_id: int) -> Request:
""" Builds a request for the list of trusted phone numbers for sms 2fa """

url = _auth_url(context.domain) + "/verify/phone"

json = {"phoneNumber":{"id":device_id},"mode":"sms"}

req = _InternalRequest(
method="PUT",
url=url,
headers = {
**_oauth_const_headers(),
**_oauth_redirect_header(context.domain),
**_oauth_headers(context.oauth_session),
** {"Content-type": "application/json"},
},
json = json)
return req

def build_verify_sms_code_request(context: _TrustedPhoneContextProvider, device_id: int, code: int) -> Request:
""" Builds a request for the list of trusted phone numbers for sms 2fa """

url = _auth_url(context.domain) + "/verify/phone/securitycode"

json = {"phoneNumber":{"id":device_id},"securityCode":{"code":str(code)},"mode":"sms"}

req = _InternalRequest(
method="POST",
url=url,
headers = {
**_oauth_const_headers(),
**_oauth_redirect_header(context.domain),
**_oauth_headers(context.oauth_session),
** {"Content-type": "application/json"},
** {"Accept": "application/json"},
},
json = json)
return req
10 changes: 1 addition & 9 deletions src/pyicloud_ipd/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import copy
import getpass
import os
from typing import Any, Callable, Dict, Optional, Sequence, TypeVar
from typing import Callable, Dict, Optional, Sequence, TypeVar
import keyring
import sys

from pyicloud_ipd.asset_version import AssetVersion
from pyicloud_ipd.version_size import AssetVersionSize, VersionSize
Expand Down Expand Up @@ -149,9 +147,3 @@ def disambiguate_filenames(_versions: Dict[VersionSize, AssetVersion], _sizes:Se

return _results

_Tin2 = TypeVar("_Tin2")
def get_property(prop_name: str, src: Dict[str, _Tin2]) -> Optional[_Tin2]:
return src.get(prop_name)

def parse_res(filename:str, res: Dict[str, Any]) -> AssetVersion:
raise NotImplementedError()
Loading

0 comments on commit 482119f

Please sign in to comment.