Skip to content

Commit

Permalink
fix: new AppleID auth with srp
Browse files Browse the repository at this point in the history
  • Loading branch information
iowk committed Oct 25, 2024
1 parent 8b3adb0 commit 0b2cd58
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Unreleased

- fix: new AppleID auth with srp [#970](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/970)
- feature: when ran without parameters, `icloudpd` shows help [#963](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/963)
- fix: force_size should not skip subsequent sizes [#955](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/955)

Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ dependencies = [
"pytz==2024.1",
"certifi==2022.12.7",
"keyring==25.2.1",
"keyrings-alt==5.0.1"
"keyrings-alt==5.0.1",
"srp==1.0.21",
]

[project.optional-dependencies]
Expand Down
78 changes: 66 additions & 12 deletions src/pyicloud_ipd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from re import match
import http.cookiejar as cookielib
import getpass
import srp
import base64
import hashlib

from requests import PreparedRequest, Request, Response

Expand Down Expand Up @@ -190,30 +193,81 @@ def authenticate(self, force_refresh:bool=False, service:Optional[Any]=None) ->
if not login_successful:
LOGGER.debug("Authenticating as %s", self.user["accountName"])

data = dict(self.user)

data["rememberMe"] = True
data["trustTokens"] = []
if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]

headers = self._get_auth_headers()

scnt = self.session_data.get("scnt")
if scnt:
headers["scnt"] = scnt

session_id = self.session_data.get("session_id")
if session_id:
headers["X-Apple-ID-Session-Id"] = session_id

class SrpPassword():
# srp uses the encoded password at process_challenge(), thus set_encrypt_info() should be called before that
def __init__(self, password: str):
self.pwd = password

def set_encrypt_info(self, salt: bytes, iterations: int):
self.salt = salt
self.iterations = iterations

def encode(self):
key_length = 32
return hashlib.pbkdf2_hmac('sha256', hashlib.sha256(self.pwd.encode()).digest(), self.salt, self.iterations, key_length)

# Step 1: client generates private key a (stored in srp.User) and public key A, sends to server
srp_password = SrpPassword(self.user["password"])
srp.rfc5054_enable()
srp.no_username_in_x()
usr = srp.User(self.user["accountName"], srp_password, hash_alg=srp.SHA256)
uname, A = usr.start_authentication()
data = {
'a': base64.b64encode(A).decode(),
'accountName': uname,
'protocols': ['s2k', 's2k_fo']
}

try:
response = self.session.post("%s/signin/init" % self.AUTH_ENDPOINT, data=json.dumps(data), headers=headers)
if response.status_code == 401:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Failed to initiate srp authentication."
raise PyiCloudFailedLoginException(msg, error) from error

# Step 2: server sends public key B, salt, and c to client
body = response.json()
salt = base64.b64decode(body['salt'])
b = base64.b64decode(body['b'])
c = body['c']
iterations = body['iteration']

# Step 3: client generates session key M1 and M2 with salt and b, sends to server
srp_password.set_encrypt_info(salt, iterations)
m1 = usr.process_challenge( salt, b )
m2 = usr.H_AMK

data = {
"accountName": uname,
"c": c,
"m1": base64.b64encode(m1).decode(),
"m2": base64.b64encode(m2).decode(),
"rememberMe": True,
"trustTokens": [],
}

if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]

try:
self.session.post(
"%s/signin" % self.AUTH_ENDPOINT,
response = self.session.post(
"%s/signin/complete" % self.AUTH_ENDPOINT,
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
if response.status_code == 401:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error
Expand Down Expand Up @@ -284,7 +338,7 @@ def _validate_token(self) -> Dict[str, Any]:

def _get_auth_headers(self, overrides: Optional[Dict[str, str]]=None) -> Dict[str, str]:
headers = {
"Accept": "*/*",
"Accept": "application/json, text/javascript",
"Content-Type": "application/json",
"X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
"X-Apple-OAuth-Client-Type": "firstPartyAuth",
Expand Down

0 comments on commit 0b2cd58

Please sign in to comment.