Skip to content

Commit

Permalink
fix: fallback to old raw password auth if srp auth fails (#1018)
Browse files Browse the repository at this point in the history
  • Loading branch information
iowk authored Dec 27, 2024
1 parent 96f9451 commit b700382
Show file tree
Hide file tree
Showing 5 changed files with 431 additions and 86 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- fix: fallback to old raw password auth if srp auth fails [#975](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/975)

## 1.25.0 (2024-12-03)

- fix: failed to authenticate for accounts with srp s2k_fo auth protocol [#975](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/975)
Expand Down
201 changes: 115 additions & 86 deletions src/pyicloud_ipd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,93 +192,15 @@ def authenticate(self, force_refresh:bool=False, service:Optional[Any]=None) ->

if not login_successful:
LOGGER.debug("Authenticating as %s", self.user["accountName"])

headers = self._get_auth_headers()

class SrpPassword():
# srp uses the encoded password at process_challenge(), thus set_encrypt_info() should be called before that
def __init__(self, password: str):
self.pwd = password

def set_encrypt_info(self, protocol: str, salt: bytes, iterations: int) -> None:
self.protocol = protocol
self.salt = salt
self.iterations = iterations

def encode(self) -> bytes:
password_hash = hashlib.sha256(self.pwd.encode())
password_digest = password_hash.hexdigest().encode() if self.protocol == 's2k_fo' else password_hash.digest()
key_length = 32
return hashlib.pbkdf2_hmac('sha256', password_digest, salt, iterations, key_length)

# Step 1: client generates private key a (stored in srp.User) and public key A, sends to server
srp_password = SrpPassword(self.user["password"])
srp.rfc5054_enable()
srp.no_username_in_x()
usr = srp.User(self.user["accountName"], srp_password, hash_alg=srp.SHA256, ng_type=srp.NG_2048)
uname, A = usr.start_authentication()
data = {
'a': base64.b64encode(A).decode(),
'accountName': uname,
'protocols': ['s2k', 's2k_fo']
}

try:
response = self.session.post("%s/signin/init" % self.AUTH_ENDPOINT, data=json.dumps(data), headers=headers)
if response.status_code == 401:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Failed to initiate srp authentication."
raise PyiCloudFailedLoginException(msg, error) from error

# Step 2: server sends public key B, salt, and c to client
body = response.json()
salt = base64.b64decode(body['salt'])
b = base64.b64decode(body['b'])
c = body['c']
iterations = body['iteration']
protocol = body['protocol']

# Step 3: client generates session key M1 and M2 with salt and b, sends to server
srp_password.set_encrypt_info(protocol, salt, iterations)
m1 = usr.process_challenge( salt, b )
m2 = usr.H_AMK

data = {
"accountName": uname,
"c": c,
"m1": base64.b64encode(m1).decode(),
"m2": base64.b64encode(m2).decode(),
"rememberMe": True,
"trustTokens": [],
}

if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]

try:
response = self.session.post(
"%s/signin/complete" % self.AUTH_ENDPOINT,
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
if response.status_code == 409:
# requires 2FA
pass
elif response.status_code == 412:
# non 2FA account returns 412 "precondition no met"
headers = self._get_auth_headers()
response = self.session.post(
"%s/repair/complete" % self.AUTH_ENDPOINT,
data=json.dumps({}),
headers=headers,
)
elif response.status_code >= 400 and response.status_code < 600:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error
self._authenticate_srp()
except PyiCloudFailedLoginException as error:
LOGGER.error("Failed to login with srp, falling back to old raw password authentication. Error: %s", error)
try:
self._authenticate_raw_password()
except PyiCloudFailedLoginException as error:
LOGGER.error("Failed to login with raw password. Error: %s", error)
raise error

self._authenticate_with_token()

Expand Down Expand Up @@ -332,6 +254,113 @@ def _authenticate_with_credentials_service(self, service: str) -> None:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error

def _authenticate_srp(self) -> None:
class SrpPassword():
# srp uses the encoded password at process_challenge(), thus set_encrypt_info() should be called before that
def __init__(self, password: str):
self.pwd = password

def set_encrypt_info(self, protocol: str, salt: bytes, iterations: int) -> None:
self.protocol = protocol
self.salt = salt
self.iterations = iterations

def encode(self) -> bytes:
password_hash = hashlib.sha256(self.pwd.encode())
password_digest = password_hash.hexdigest().encode() if self.protocol == 's2k_fo' else password_hash.digest()
key_length = 32
return hashlib.pbkdf2_hmac('sha256', password_digest, self.salt, self.iterations, key_length)

# Step 1: client generates private key a (stored in srp.User) and public key A, sends to server
srp_password = SrpPassword(self.user["password"])
srp.rfc5054_enable()
srp.no_username_in_x()
usr = srp.User(self.user["accountName"], srp_password, hash_alg=srp.SHA256, ng_type=srp.NG_2048)
uname, A = usr.start_authentication()
data = {
'a': base64.b64encode(A).decode(),
'accountName': uname,
'protocols': ['s2k', 's2k_fo']
}

headers = self._get_auth_headers()
try:
response = self.session.post("%s/signin/init" % self.AUTH_ENDPOINT, data=json.dumps(data), headers=headers)
if response.status_code == 401:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Failed to initiate srp authentication."
raise PyiCloudFailedLoginException(msg, error) from error

# Step 2: server sends public key B, salt, and c to client
body = response.json()
salt = base64.b64decode(body['salt'])
b = base64.b64decode(body['b'])
c = body['c']
iterations = body['iteration']
protocol = body['protocol']

# Step 3: client generates session key M1 and M2 with salt and b, sends to server
srp_password.set_encrypt_info(protocol, salt, iterations)
m1 = usr.process_challenge( salt, b )
m2 = usr.H_AMK

data = {
"accountName": uname,
"c": c,
"m1": base64.b64encode(m1).decode(),
"m2": base64.b64encode(m2).decode(),
"rememberMe": True,
"trustTokens": [],
}

if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]

try:
response = self.session.post(
"%s/signin/complete" % self.AUTH_ENDPOINT,
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
if response.status_code == 409:
# requires 2FA
pass
elif response.status_code == 412:
# non 2FA account returns 412 "precondition no met"
headers = self._get_auth_headers()
response = self.session.post(
"%s/repair/complete" % self.AUTH_ENDPOINT,
data=json.dumps({}),
headers=headers,
)
elif response.status_code >= 400 and response.status_code < 600:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error

def _authenticate_raw_password(self) -> None:
data = dict(self.user)

data["rememberMe"] = True
data["trustTokens"] = []
if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]

headers = self._get_auth_headers()
try:
self.session.post(
"%s/signin" % self.AUTH_ENDPOINT,
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error

def _validate_token(self) -> Dict[str, Any]:
"""Checks if the current access token is still valid."""
LOGGER.debug("Checking session token validity")
Expand Down
33 changes: 33 additions & 0 deletions tests/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,41 @@ def test_failed_auth(self) -> None:
"EC5646DE-9423-11E8-BF21-14109FE0B321",
)

self.assertIn(
"ERROR Failed to login with srp, falling back to old raw password authentication.",
self._caplog.text,
)
self.assertTrue("Invalid email/password combination." in str(context.exception))

def test_fallback_raw_password(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")

for dir in [base_dir, cookie_dir]:
recreate_path(dir)

with vcr.use_cassette(os.path.join(self.vcr_path, "fallback_raw_password.yml")): # noqa: SIM117
runner = CliRunner(env={"CLIENT_ID": "EC5646DE-9423-11E8-BF21-14109FE0B321"})
result = runner.invoke(
main,
[
"--username",
"[email protected]",
"--password",
"password1",
"--no-progress-bar",
"--cookie-directory",
cookie_dir,
"--auth-only",
],
)
self.assertIn(
"ERROR Failed to login with srp, falling back to old raw password authentication.",
self._caplog.text,
)
self.assertIn("INFO Authentication completed successfully", self._caplog.text)
assert result.exit_code == 0

def test_2sa_required(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
Expand Down
49 changes: 49 additions & 0 deletions tests/vcr_cassettes/failed_auth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,53 @@ interactions:
status:
code: 401
message: ''
- request:
body: !!python/unicode '{"accountName": "bad_username", "password": "bad_password", "rememberMe":
true, "trustTokens": []}'
headers:
Accept: ['*/*']
Accept-Encoding: ['gzip, deflate']
Connection: ['keep-alive']
Content-Length: ['98']
Content-Type: ['application/json']
Origin: ['https://www.icloud.com']
Referer: ['https://www.icloud.com/']
User-Agent: ['Opera/9.52 (X11; Linux i686; U; en)']
X-Apple-OAuth-Client-Id: ['d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d']
X-Apple-OAuth-Client-Type: ['firstPartyAuth']
X-Apple-OAuth-Redirect-URI: ['https://www.icloud.com']
X-Apple-OAuth-Require-Grant-Code: ['true']
X-Apple-OAuth-Response-Mode: ['web_message']
X-Apple-OAuth-Response-Type: ['code']
X-Apple-OAuth-State: ['EC5646DE-9423-11E8-BF21-14109FE0B321']
X-Apple-Widget-Key: ['d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d']
method: POST
uri: https://idmsa.apple.com/appleauth/auth/signin?isRememberMeEnabled=true
response:
body:
string: !!python/unicode '{}'
headers:
Cache-Control:
- 'no-cache'
- 'no-store'
Connection: ['keep-alive']
Content-Type: ['text/html;charset=UTF-8']
Date: ['Fri, 15 Dec 2023 17:28:03 GMT']
Pragma: ['no-cache']
Referrer-Policy: ['origin']
Server: ['Apple']
Strict-Transport-Security: ['max-age=31536000; includeSubDomains; preload']
Transfer-Encoding: ['chunked']
X-Apple-I-Request-ID: ['12345678-1234-1234-1234-123456789012']
X-Apple-I-Rscd: ['401']
X-BuildVersion: ['R4_1']
X-Content-Type-Options: ['nosniff']
X-FRAME-OPTIONS: ['DENY']
X-XSS-Protection: ['1; mode=block']
content-length: ['23705']
scnt: ['scnt-1234567890']
vary: ['accept-encoding']
status:
code: 200
message: ''
version: 1
Loading

0 comments on commit b700382

Please sign in to comment.