diff --git a/src/pyicloud_ipd/base.py b/src/pyicloud_ipd/base.py index 9dc5402f7..ae289647b 100644 --- a/src/pyicloud_ipd/base.py +++ b/src/pyicloud_ipd/base.py @@ -9,6 +9,9 @@ from re import match import http.cookiejar as cookielib import getpass +import srp +import base64 +import hashlib from requests import PreparedRequest, Request, Response @@ -190,26 +193,75 @@ def authenticate(self, force_refresh:bool=False, service:Optional[Any]=None) -> if not login_successful: LOGGER.debug("Authenticating as %s", self.user["accountName"]) - data = dict(self.user) - - data["rememberMe"] = True - data["trustTokens"] = [] - if self.session_data.get("trust_token"): - data["trustTokens"] = [self.session_data.get("trust_token")] - headers = self._get_auth_headers() - scnt = self.session_data.get("scnt") if scnt: headers["scnt"] = scnt - + session_id = self.session_data.get("session_id") if session_id: headers["X-Apple-ID-Session-Id"] = session_id + class SrpPassword(): + def __init__(self, password: str): + self.password = password + + def set_encrypt_info(self, salt: bytes, iterations: int, key_length: int): + self.salt = salt + self.iterations = iterations + self.key_length = key_length + + def encode(self): + password_hash = hashlib.sha256(self.password.encode('utf-8')).digest() + return hashlib.pbkdf2_hmac('sha256', password_hash, salt, iterations, key_length) + + srp_password = SrpPassword(self.user["password"]) + srp.rfc5054_enable() + srp.no_username_in_x() + usr = srp.User(self.user["accountName"], srp_password, hash_alg=srp.SHA256, ng_type=srp.NG_2048) + + uname, A = usr.start_authentication() + + data = { + 'a': base64.b64encode(A).decode(), + 'accountName': uname, + 'protocols': ['s2k', 's2k_fo'] + } + + try: + response = self.session.post("%s/signin/init" % self.AUTH_ENDPOINT, data=json.dumps(data), headers=headers) + response.raise_for_status() + except PyiCloudAPIResponseException as error: + msg = "Failed to initiate srp authentication." + raise PyiCloudFailedLoginException(msg, error) from error + + body = response.json() + + salt = base64.b64decode(body['salt']) + b = base64.b64decode(body['b']) + c = body['c'] + iterations = body['iteration'] + key_length = 32 + srp_password.set_encrypt_info(salt, iterations, key_length) + + m1 = usr.process_challenge( salt, b ) + m2 = usr.H_AMK + + data = { + "accountName": uname, + "c": c, + "m1": base64.b64encode(m1).decode(), + "m2": base64.b64encode(m2).decode(), + "rememberMe": True, + "trustTokens": [], + } + + if self.session_data.get("trust_token"): + data["trustTokens"] = [self.session_data.get("trust_token")] + try: self.session.post( - "%s/signin" % self.AUTH_ENDPOINT, + "%s/signin/complete" % self.AUTH_ENDPOINT, params={"isRememberMeEnabled": "true"}, data=json.dumps(data), headers=headers, @@ -284,7 +336,7 @@ def _validate_token(self) -> Dict[str, Any]: def _get_auth_headers(self, overrides: Optional[Dict[str, str]]=None) -> Dict[str, str]: headers = { - "Accept": "*/*", + "Accept": "application/json, text/javascript", "Content-Type": "application/json", "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", "X-Apple-OAuth-Client-Type": "firstPartyAuth",