Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fallback to old raw password auth if srp auth fails #1018

Merged
merged 1 commit into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- fix: fallback to old raw password auth if srp auth fails [#975](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/975)

## 1.25.0 (2024-12-03)

- fix: failed to authenticate for accounts with srp s2k_fo auth protocol [#975](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/975)
Expand Down
201 changes: 115 additions & 86 deletions src/pyicloud_ipd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,93 +192,15 @@ def authenticate(self, force_refresh:bool=False, service:Optional[Any]=None) ->

if not login_successful:
LOGGER.debug("Authenticating as %s", self.user["accountName"])

headers = self._get_auth_headers()

class SrpPassword():
# srp uses the encoded password at process_challenge(), thus set_encrypt_info() should be called before that
def __init__(self, password: str):
self.pwd = password

def set_encrypt_info(self, protocol: str, salt: bytes, iterations: int) -> None:
self.protocol = protocol
self.salt = salt
self.iterations = iterations

def encode(self) -> bytes:
password_hash = hashlib.sha256(self.pwd.encode())
password_digest = password_hash.hexdigest().encode() if self.protocol == 's2k_fo' else password_hash.digest()
key_length = 32
return hashlib.pbkdf2_hmac('sha256', password_digest, salt, iterations, key_length)

# Step 1: client generates private key a (stored in srp.User) and public key A, sends to server
srp_password = SrpPassword(self.user["password"])
srp.rfc5054_enable()
srp.no_username_in_x()
usr = srp.User(self.user["accountName"], srp_password, hash_alg=srp.SHA256, ng_type=srp.NG_2048)
uname, A = usr.start_authentication()
data = {
'a': base64.b64encode(A).decode(),
'accountName': uname,
'protocols': ['s2k', 's2k_fo']
}

try:
response = self.session.post("%s/signin/init" % self.AUTH_ENDPOINT, data=json.dumps(data), headers=headers)
if response.status_code == 401:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Failed to initiate srp authentication."
raise PyiCloudFailedLoginException(msg, error) from error

# Step 2: server sends public key B, salt, and c to client
body = response.json()
salt = base64.b64decode(body['salt'])
b = base64.b64decode(body['b'])
c = body['c']
iterations = body['iteration']
protocol = body['protocol']

# Step 3: client generates session key M1 and M2 with salt and b, sends to server
srp_password.set_encrypt_info(protocol, salt, iterations)
m1 = usr.process_challenge( salt, b )
m2 = usr.H_AMK

data = {
"accountName": uname,
"c": c,
"m1": base64.b64encode(m1).decode(),
"m2": base64.b64encode(m2).decode(),
"rememberMe": True,
"trustTokens": [],
}

if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]

try:
response = self.session.post(
"%s/signin/complete" % self.AUTH_ENDPOINT,
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
if response.status_code == 409:
# requires 2FA
pass
elif response.status_code == 412:
# non 2FA account returns 412 "precondition no met"
headers = self._get_auth_headers()
response = self.session.post(
"%s/repair/complete" % self.AUTH_ENDPOINT,
data=json.dumps({}),
headers=headers,
)
elif response.status_code >= 400 and response.status_code < 600:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error
self._authenticate_srp()
except PyiCloudFailedLoginException as error:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is anything in payload we can use as a flag for srp rather than try-catch.

If try-catch is the only way, what other exceptions may be incorrectly channelled to non-srp path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the account I tried, the response for srp signin/complete is:

('Invalid email/password combination.', PyiCloudAPIResponseException('{\n  "serviceErrors" : [ {\n    "code" : "-20283",\n    "message" : "Enter the email or phone number and password for your Apple\xa0Account.",\n    "suppressDismissal" : false\n  } ]\n} (401)'))

Which is the same as when the password is incorrect.

This also means if the password is incorrect, it will be incorrectly channelled to non-srp path.

LOGGER.error("Failed to login with srp, falling back to old raw password authentication. Error: %s", error)
try:
self._authenticate_raw_password()
except PyiCloudFailedLoginException as error:
LOGGER.error("Failed to login with raw password. Error: %s", error)
raise error

self._authenticate_with_token()

Expand Down Expand Up @@ -332,6 +254,113 @@ def _authenticate_with_credentials_service(self, service: str) -> None:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error

def _authenticate_srp(self) -> None:
class SrpPassword():
# srp uses the encoded password at process_challenge(), thus set_encrypt_info() should be called before that
def __init__(self, password: str):
self.pwd = password

def set_encrypt_info(self, protocol: str, salt: bytes, iterations: int) -> None:
self.protocol = protocol
self.salt = salt
self.iterations = iterations

def encode(self) -> bytes:
password_hash = hashlib.sha256(self.pwd.encode())
password_digest = password_hash.hexdigest().encode() if self.protocol == 's2k_fo' else password_hash.digest()
key_length = 32
return hashlib.pbkdf2_hmac('sha256', password_digest, self.salt, self.iterations, key_length)

# Step 1: client generates private key a (stored in srp.User) and public key A, sends to server
srp_password = SrpPassword(self.user["password"])
srp.rfc5054_enable()
srp.no_username_in_x()
usr = srp.User(self.user["accountName"], srp_password, hash_alg=srp.SHA256, ng_type=srp.NG_2048)
uname, A = usr.start_authentication()
data = {
'a': base64.b64encode(A).decode(),
'accountName': uname,
'protocols': ['s2k', 's2k_fo']
}

headers = self._get_auth_headers()
try:
response = self.session.post("%s/signin/init" % self.AUTH_ENDPOINT, data=json.dumps(data), headers=headers)
if response.status_code == 401:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Failed to initiate srp authentication."
raise PyiCloudFailedLoginException(msg, error) from error

# Step 2: server sends public key B, salt, and c to client
body = response.json()
salt = base64.b64decode(body['salt'])
b = base64.b64decode(body['b'])
c = body['c']
iterations = body['iteration']
protocol = body['protocol']

# Step 3: client generates session key M1 and M2 with salt and b, sends to server
srp_password.set_encrypt_info(protocol, salt, iterations)
m1 = usr.process_challenge( salt, b )
m2 = usr.H_AMK

data = {
"accountName": uname,
"c": c,
"m1": base64.b64encode(m1).decode(),
"m2": base64.b64encode(m2).decode(),
"rememberMe": True,
"trustTokens": [],
}

if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]

try:
response = self.session.post(
"%s/signin/complete" % self.AUTH_ENDPOINT,
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
if response.status_code == 409:
# requires 2FA
pass
elif response.status_code == 412:
# non 2FA account returns 412 "precondition no met"
headers = self._get_auth_headers()
response = self.session.post(
"%s/repair/complete" % self.AUTH_ENDPOINT,
data=json.dumps({}),
headers=headers,
)
elif response.status_code >= 400 and response.status_code < 600:
raise PyiCloudAPIResponseException(response.text, str(response.status_code))
except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error

def _authenticate_raw_password(self) -> None:
data = dict(self.user)

data["rememberMe"] = True
data["trustTokens"] = []
if self.session_data.get("trust_token"):
data["trustTokens"] = [self.session_data.get("trust_token")]

headers = self._get_auth_headers()
try:
self.session.post(
"%s/signin" % self.AUTH_ENDPOINT,
params={"isRememberMeEnabled": "true"},
data=json.dumps(data),
headers=headers,
)
except PyiCloudAPIResponseException as error:
msg = "Invalid email/password combination."
raise PyiCloudFailedLoginException(msg, error) from error

def _validate_token(self) -> Dict[str, Any]:
"""Checks if the current access token is still valid."""
LOGGER.debug("Checking session token validity")
Expand Down
33 changes: 33 additions & 0 deletions tests/test_authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,41 @@ def test_failed_auth(self) -> None:
"EC5646DE-9423-11E8-BF21-14109FE0B321",
)

self.assertIn(
"ERROR Failed to login with srp, falling back to old raw password authentication.",
self._caplog.text,
)
self.assertTrue("Invalid email/password combination." in str(context.exception))

def test_fallback_raw_password(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")

for dir in [base_dir, cookie_dir]:
recreate_path(dir)

with vcr.use_cassette(os.path.join(self.vcr_path, "fallback_raw_password.yml")): # noqa: SIM117
runner = CliRunner(env={"CLIENT_ID": "EC5646DE-9423-11E8-BF21-14109FE0B321"})
result = runner.invoke(
main,
[
"--username",
"[email protected]",
"--password",
"password1",
"--no-progress-bar",
"--cookie-directory",
cookie_dir,
"--auth-only",
],
)
self.assertIn(
"ERROR Failed to login with srp, falling back to old raw password authentication.",
self._caplog.text,
)
self.assertIn("INFO Authentication completed successfully", self._caplog.text)
assert result.exit_code == 0

def test_2sa_required(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])
cookie_dir = os.path.join(base_dir, "cookie")
Expand Down
49 changes: 49 additions & 0 deletions tests/vcr_cassettes/failed_auth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,53 @@ interactions:
status:
code: 401
message: ''
- request:
body: !!python/unicode '{"accountName": "bad_username", "password": "bad_password", "rememberMe":
true, "trustTokens": []}'
headers:
Accept: ['*/*']
Accept-Encoding: ['gzip, deflate']
Connection: ['keep-alive']
Content-Length: ['98']
Content-Type: ['application/json']
Origin: ['https://www.icloud.com']
Referer: ['https://www.icloud.com/']
User-Agent: ['Opera/9.52 (X11; Linux i686; U; en)']
X-Apple-OAuth-Client-Id: ['d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d']
X-Apple-OAuth-Client-Type: ['firstPartyAuth']
X-Apple-OAuth-Redirect-URI: ['https://www.icloud.com']
X-Apple-OAuth-Require-Grant-Code: ['true']
X-Apple-OAuth-Response-Mode: ['web_message']
X-Apple-OAuth-Response-Type: ['code']
X-Apple-OAuth-State: ['EC5646DE-9423-11E8-BF21-14109FE0B321']
X-Apple-Widget-Key: ['d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d']
method: POST
uri: https://idmsa.apple.com/appleauth/auth/signin?isRememberMeEnabled=true
response:
body:
string: !!python/unicode '{}'
headers:
Cache-Control:
- 'no-cache'
- 'no-store'
Connection: ['keep-alive']
Content-Type: ['text/html;charset=UTF-8']
Date: ['Fri, 15 Dec 2023 17:28:03 GMT']
Pragma: ['no-cache']
Referrer-Policy: ['origin']
Server: ['Apple']
Strict-Transport-Security: ['max-age=31536000; includeSubDomains; preload']
Transfer-Encoding: ['chunked']
X-Apple-I-Request-ID: ['12345678-1234-1234-1234-123456789012']
X-Apple-I-Rscd: ['401']
X-BuildVersion: ['R4_1']
X-Content-Type-Options: ['nosniff']
X-FRAME-OPTIONS: ['DENY']
X-XSS-Protection: ['1; mode=block']
content-length: ['23705']
scnt: ['scnt-1234567890']
vary: ['accept-encoding']
status:
code: 200
message: ''
version: 1
Loading
Loading