Skip to content

Commit

Permalink
dump input when decoding of filename is failing (#942)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyNikiforov authored Sep 1, 2024
1 parent c137424 commit 5d417e0
Show file tree
Hide file tree
Showing 8 changed files with 23,754 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/produce-artifacts.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
with:
concurrent_skipping: 'same_content_newer'
skip_after_successful_duplicate: 'true'
paths_ignore: '["**/*.md", "examples/**"]'
paths_ignore: '["**/*.md", "examples/**", "tests/**"]'
do_not_skip: '["workflow_dispatch", "schedule"]'

get_version:
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- dump encoded filename in exception when there is an error in decoding it [#935](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/935) and [ref](https://github.com/boredazfcuk/docker-icloudpd/issues/641)

## 1.23.1 (2024-08-22)

- fix: use a-z for sms mfa index to disambiguate with mfa code with leading zeros [#925](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/925)
Expand Down
25 changes: 24 additions & 1 deletion src/foundation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
from typing import NamedTuple
from typing import Callable, NamedTuple, TypeVar

import pytz
from tzlocal import get_localzone
Expand All @@ -25,3 +25,26 @@ def version_info_formatted() -> str:
get_localzone()
)
return f"version:{vi.version}, commit sha:{vi.commit_sha}, commit timestamp:{ts:%c %Z}"


def bytes_decode(encoding: str) -> Callable[[bytes], str]:
def _internal(inp: bytes) -> str:
return inp.decode(encoding)

return _internal


T_in = TypeVar("T_in")
T_out = TypeVar("T_out")


def wrap_param_in_exception(func: Callable[[T_in], T_out]) -> Callable[[T_in], T_out]:
"""Decorator to wrap excpetion into ValueError with the dump of the input parameter"""

def _internal(input: T_in) -> T_out:
try:
return func(input)
except Exception as err:
raise ValueError(f"Invalid Input: {input!r}") from err

return _internal
21 changes: 17 additions & 4 deletions src/pyicloud_ipd/services/photos.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import re

from datetime import datetime
from typing import Any, Callable, Dict, Generator, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Dict, Generator, Optional, Sequence, Tuple, TypeVar, Union
import typing

from requests import Response
from foundation import wrap_param_in_exception, bytes_decode
from pyicloud_ipd.asset_version import AssetVersion
from pyicloud_ipd.exceptions import PyiCloudServiceNotActivatedException
from pyicloud_ipd.exceptions import PyiCloudAPIResponseException
Expand All @@ -22,6 +23,7 @@
from pyicloud_ipd.file_match import FileMatchPolicy
from pyicloud_ipd.raw_policy import RawTreatmentPolicy
from pyicloud_ipd.session import PyiCloudSession
from pyicloud_ipd.utils import compose
from pyicloud_ipd.version_size import AssetVersionSize, LivePhotoVersionSize, VersionSize

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -611,9 +613,20 @@ def id(self) -> str:
def filename(self) -> str:
fields = self._master_record['fields']
if 'filenameEnc' in fields and 'value' in fields['filenameEnc']:
_filename = self._service.filename_cleaner(base64.b64decode(
fields['filenameEnc']['value']
).decode('utf-8'))
_decode_base64 = wrap_param_in_exception(base64.b64decode)
_decode_bytes = wrap_param_in_exception(bytes_decode('utf-8'))
_filename = compose(
self._service.filename_cleaner,
compose(
_decode_bytes,
_decode_base64
)
)(fields['filenameEnc']['value'])

# _filename = self._service.filename_cleaner(base64.b64decode(
# fields['filenameEnc']['value']
# ).decode('utf-8'))

if self._service.file_match_policy == FileMatchPolicy.NAME_ID7:
_f, _e = os.path.splitext(_filename)
_a = base64.b64encode(self.id.encode('utf-8')).decode('ascii')[0:7]
Expand Down
82 changes: 81 additions & 1 deletion tests/test_download_photos.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os
import sys
from typing import Any, NoReturn, Optional, Sequence, Tuple
from typing import Any, List, NoReturn, Optional, Sequence, Tuple
from unittest import TestCase, mock
from unittest.mock import ANY, PropertyMock, call

Expand Down Expand Up @@ -2085,3 +2085,83 @@ def test_download_raw_photos_policy_as_is(self) -> None:
self.assertIn("INFO All photos have been downloaded", self._caplog.text)

assert result.exit_code == 0

def test_download_bad_filename_base64_encoding(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])

files_to_create = [
("2018/07/30", "IMG_7408.JPG", 1151066),
("2018/07/30", "IMG_7407.JPG", 656257),
]

files_to_download: List[Tuple[str, str]] = [
# <>:"/\|?* -- windows
# / & \0x00 -- linux
# aS9uIHY6YQBsKmk/ZFxwPGE+dCJofC5KUE
# ("2018/07/31", "i_n v_a_l_i_d_p_a_t_h_.JPG")
]

data_dir, result = run_icloudpd_test(
self.assertEqual,
self.vcr_path,
base_dir,
"listing_photos_bad_filename_base64_encoding.yml",
files_to_create,
files_to_download,
[
"--username",
"[email protected]",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
"1",
],
)

self.assertIsInstance(result.exception, ValueError)
# self.assertEqual(result.exception, ValueError("Invalid Input: 'aS9uIHY6YQBsKmk/ZFxwPGE+dCJofC5KUE'"))

def test_download_bad_filename_utf8_encoding(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])

files_to_create = [
("2018/07/30", "IMG_7408.JPG", 1151066),
("2018/07/30", "IMG_7407.JPG", 656257),
]

files_to_download: List[Tuple[str, str]] = [
# <>:"/\|?* -- windows
# / & \0x00 -- linux
# aS9uIHY6YQBsKmk/ZFxwPGE+dCJofC5KUE -> abcdefgh
# ("2018/07/31", "i_n v_a_l_i_d_p_a_t_h_.JPG")
]

data_dir, result = run_icloudpd_test(
self.assertEqual,
self.vcr_path,
base_dir,
"listing_photos_bad_filename_utf8_encoding.yml",
files_to_create,
files_to_download,
[
"--username",
"[email protected]",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--threads-num",
"1",
],
)

self.assertIsInstance(result.exception, ValueError)
# self.assertEqual(result.exception, ValueError("Invalid Input: b'i\\xb7\\x1dy\\xf8!'"))
82 changes: 81 additions & 1 deletion tests/test_download_photos_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import os
import sys
from typing import Any, NoReturn, Optional, Sequence, Tuple
from typing import Any, List, NoReturn, Optional, Sequence, Tuple
from unittest import TestCase, mock
from unittest.mock import ANY, PropertyMock, call

Expand Down Expand Up @@ -2073,3 +2073,83 @@ def test_download_raw_photos_policy_as_is_name_id7(self) -> None:
self.assertIn("INFO All photos have been downloaded", self._caplog.text)

assert result.exit_code == 0

def test_download_bad_filename_base64_encoding_name_id7(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])

files_to_create = [
("2018/07/30", "IMG_7408_QVI4T2l.JPG", 1151066),
("2018/07/30", "IMG_7407_QVovd0F.JPG", 656257),
]

files_to_download: List[Tuple[str, str]] = [
# <>:"/\|?* -- windows
# / & \0x00 -- linux
# aS9uIHY6YQBsKmk/ZFxwPGE+dCJofC5KUE
# ("2018/07/31", "i_n v_a_l_i_d_p_a_t_h_.JPG")
]

data_dir, result = run_icloudpd_test(
self.assertEqual,
self.vcr_path,
base_dir,
"listing_photos_bad_filename_base64_encoding.yml",
files_to_create,
files_to_download,
[
"--username",
"[email protected]",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--file-match-policy",
"name-id7",
],
)

self.assertIsInstance(result.exception, ValueError)
# ValueError("Invalid Input: 'aS9uIHY6YQBsKmk/ZFxwPGE+dCJofC5KUE'")

def test_download_bad_filename_utf8_encoding_name_id7(self) -> None:
base_dir = os.path.join(self.fixtures_path, inspect.stack()[0][3])

files_to_create = [
("2018/07/30", "IMG_7408_QVI4T2l.JPG", 1151066),
("2018/07/30", "IMG_7407_QVovd0F.JPG", 656257),
]

files_to_download: List[Tuple[str, str]] = [
# <>:"/\|?* -- windows
# / & \0x00 -- linux
# aS9uIHY6YQBsKmk/ZFxwPGE+dCJofC5KUE -> abcdefgh
# ("2018/07/31", "i_n v_a_l_i_d_p_a_t_h_.JPG")
]

data_dir, result = run_icloudpd_test(
self.assertEqual,
self.vcr_path,
base_dir,
"listing_photos_bad_filename_utf8_encoding.yml",
files_to_create,
files_to_download,
[
"--username",
"[email protected]",
"--password",
"password1",
"--recent",
"5",
"--skip-videos",
"--skip-live-photos",
"--no-progress-bar",
"--file-match-policy",
"name-id7",
],
)

self.assertIsInstance(result.exception, ValueError)
# self.assertEqual(result.exception, ValueError("Invalid Input: b'i\\xb7\\x1dy\\xf8!'"))
Loading

0 comments on commit 5d417e0

Please sign in to comment.