diff --git a/pyproject.toml b/pyproject.toml index 18b270eb9..18a3a6aac 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,6 +28,7 @@ dependencies = [ "click==8.1.6", "tqdm==4.66.4", "piexif==1.1.3", + "python-dateutil==2.9.0.post0", "urllib3==1.26.16", "typing_extensions==4.11.0", "Flask==3.0.3", @@ -68,6 +69,7 @@ test = [ "pytest-timeout==2.1.0", "pytest-xdist==3.3.1", "mypy==1.10.1", + "types-python-dateutil==2.9.0.20241003", "types-pytz==2024.1.0.20240417", "types-tzlocal==5.1.0.1", "types-requests==2.31.0.2", diff --git a/src/icloudpd/autodelete.py b/src/icloudpd/autodelete.py index 018e0637d..5937bc940 100644 --- a/src/icloudpd/autodelete.py +++ b/src/icloudpd/autodelete.py @@ -73,9 +73,11 @@ def autodelete_photos( for _size, _version in disambiguate_filenames(media.versions, _sizes).items(): if _size in [AssetVersionSize.ALTERNATIVE, AssetVersionSize.ADJUSTED]: paths.add(os.path.normpath(local_download_path(_version.filename, download_dir))) + paths.add(os.path.normpath(local_download_path(_version.filename, download_dir)) + ".xmp") for _size, _version in media.versions.items(): if _size not in [AssetVersionSize.ALTERNATIVE, AssetVersionSize.ADJUSTED]: paths.add(os.path.normpath(local_download_path(_version.filename, download_dir))) + paths.add(os.path.normpath(local_download_path(_version.filename, download_dir)) + ".xmp") for path in paths: if os.path.exists(path): logger.debug("Deleting %s...", path) diff --git a/src/icloudpd/base.py b/src/icloudpd/base.py index 319dffa58..af3fc27c4 100644 --- a/src/icloudpd/base.py +++ b/src/icloudpd/base.py @@ -64,6 +64,7 @@ from icloudpd.server import serve_app from icloudpd.status import Status, StatusExchange from icloudpd.string_helpers import truncate_middle +from icloudpd.xmp_sidecar import generate_xmp_file def build_filename_cleaner( @@ -377,6 +378,11 @@ def report_version(ctx: click.Context, _param: click.Parameter, value: bool) -> help="Don't download any live photos (default: Download live photos)", is_flag=True, ) +@click.option( + "--xmp-sidecar", + help="Export additional data as XMP sidecar files (default: don't export)", + is_flag=True, +) @click.option( "--force-size", help="Only download the requested size (`adjusted` and `alternate` will not be forced)" @@ -583,6 +589,7 @@ def main( list_libraries: bool, skip_videos: bool, skip_live_photos: bool, + xmp_sidecar: bool, force_size: bool, auto_delete: bool, only_print_filenames: bool, @@ -694,6 +701,7 @@ def main( list_libraries=list_libraries, skip_videos=skip_videos, skip_live_photos=skip_live_photos, + xmp_sidecar=xmp_sidecar, force_size=force_size, auto_delete=auto_delete, only_print_filenames=only_print_filenames, @@ -756,6 +764,7 @@ def main( live_photo_size, dry_run, file_match_policy, + xmp_sidecar, ) if directory is not None else (lambda _s: lambda _c, _p: False), @@ -812,6 +821,7 @@ def download_builder( live_photo_size: LivePhotoVersionSize, dry_run: bool, file_match_policy: FileMatchPolicy, + xmp_sidecar: bool, ) -> Callable[[PyiCloudService], Callable[[Counter, PhotoAsset], bool]]: """factory for downloader""" @@ -960,6 +970,9 @@ def download_photo_(counter: Counter, photo: PhotoAsset) -> bool: download.set_utime(download_path, created_date) logger.info("Downloaded %s", truncated_path) + if xmp_sidecar: + generate_xmp_file(logger, download_path, photo._asset_record) + # Also download the live photo if present if not skip_live_photos: lp_size = live_photo_size diff --git a/src/icloudpd/config.py b/src/icloudpd/config.py index 4a25203e2..cbede5d56 100644 --- a/src/icloudpd/config.py +++ b/src/icloudpd/config.py @@ -24,6 +24,7 @@ def __init__( list_libraries: bool, skip_videos: bool, skip_live_photos: bool, + xmp_sidecar: bool, force_size: bool, auto_delete: bool, only_print_filenames: bool, diff --git a/src/icloudpd/xmp_sidecar.py b/src/icloudpd/xmp_sidecar.py new file mode 100644 index 000000000..f604be75c --- /dev/null +++ b/src/icloudpd/xmp_sidecar.py @@ -0,0 +1,295 @@ +"""Generate XMP sidecar file from photo asset record""" + +from __future__ import annotations + +import base64 +import json +import logging +import os +import plistlib +import zlib +from datetime import datetime +from typing import Any, NamedTuple +from xml.etree import ElementTree + +import dateutil.tz +from foundation import version_info + +exif_tool = None + + +class XMPMetadata(NamedTuple): + XMPToolkit: str + Title: str | None + Description: str | None + Orientation: int | None + Make: str | None + DigitalSourceType: str | None + Keywords: list[str] | None + GPSAltitude: float | None + GPSLatitude: float | None + GPSLongitude: float | None + GPSSpeed: float | None + GPSTimeStamp: datetime | None + CreateDate: datetime | None + Rating: int | None + + +def generate_xmp_file( + logger: logging.Logger, download_path: str, asset_record: dict[str, Any] +) -> None: + sidecar_path: str = download_path + ".xmp" + can_write_file: bool = True + if os.path.exists(sidecar_path) and os.path.getsize(sidecar_path) != 0: + can_write_file = False + try: + root = ElementTree.parse(sidecar_path).getroot() + xmptk_value = root.attrib.get("{adobe:ns:meta/}xmptk") + if not xmptk_value or not xmptk_value.startswith("icloudpd"): + logger.info(f"Not overwriting XMP file {sidecar_path} created by {xmptk_value}") + else: + can_write_file = True + except ElementTree.ParseError as e: + logger.info(f"Not overwriting XMP file {sidecar_path} due to parser error: {e}") + + # decode asset record fields + # for k in asset_record['fields']: + # if asset_record["fields"][k]['type'] == "ENCRYPTED_BYTES": + # try: + # asset_record["fields"][k]['decoded'] = plistlib.loads(base64.b64decode(asset_record['fields'][k]['value']), fmt=plistlib.FMT_BINARY) + # except plistlib.InvalidFileException: + # try: + # asset_record["fields"][k]['decoded'] = json.loads(zlib.decompress(base64.b64decode(asset_record['fields'][k]['value']),-zlib.MAX_WBITS)) + # except Exception as e: + # asset_record["fields"][k]['decoded'] = base64.b64decode(asset_record['fields'][k]['value']).decode("utf-8") + # json.dump(asset_record["fields"], open(download_path + ".ar.json", "w"), indent=4, default=str, sort_keys=True) + + if can_write_file: + xmp_metadata: XMPMetadata = build_metadata(asset_record) + xml_doc: ElementTree.Element = generate_xml(xmp_metadata) + # Write the XML to the file + with open(sidecar_path, "wb") as f: + f.write(ElementTree.tostring(xml_doc, encoding="utf-8", xml_declaration=True)) + + +def build_metadata(asset_record: dict[str, Any]) -> XMPMetadata: + """Build XMP metadata from asset record""" + + title = None + if "captionEnc" in asset_record["fields"]: + title = base64.b64decode(asset_record["fields"]["captionEnc"]["value"]).decode("utf-8") + + description = None + if "extendedDescEnc" in asset_record["fields"]: + description = base64.b64decode(asset_record["fields"]["extendedDescEnc"]["value"]).decode( + "utf-8" + ) + + orientation = None + if "adjustmentSimpleDataEnc" in asset_record["fields"]: + adjustments = json.loads( + zlib.decompress( + base64.b64decode(asset_record["fields"]["adjustmentSimpleDataEnc"]["value"]), + -zlib.MAX_WBITS, + ) + ) + if "metadata" in adjustments and "orientation" in adjustments["metadata"]: + orientation = adjustments["metadata"]["orientation"] + + make, digital_source_type = None, None + if ( + "assetSubtypeV2" in asset_record["fields"] + and int(asset_record["fields"]["assetSubtypeV2"]["value"]) == 3 + ): + make = "Screenshot" + digital_source_type = "screenCapture" + + keywords = None + if "keywordsEnc" in asset_record["fields"] and len(asset_record["fields"]["keywordsEnc"]) > 0: + keywords = plistlib.loads( + base64.b64decode(asset_record["fields"]["keywordsEnc"]["value"]), + fmt=plistlib.FMT_BINARY, + ) + + if "locationEnc" in asset_record["fields"]: + location = plistlib.loads( + base64.b64decode(asset_record["fields"]["locationEnc"]["value"]), + fmt=plistlib.FMT_BINARY, + ) + gps_altitude = location.get("alt") + gps_latitude = location.get("lat") + gps_longitude = location.get("lon") + gps_speed = location.get("speed") + gps_timestamp = ( + location.get("timestamp") if isinstance(location.get("timestamp"), datetime) else None + ) + else: + gps_altitude, gps_latitude, gps_longitude, gps_speed, gps_timestamp = ( + None, + None, + None, + None, + None, + ) + + create_date = None + if "assetDate" in asset_record["fields"]: + timezone_offset = 0 + if "timeZoneOffset" in asset_record["fields"]: + timezone_offset = asset_record["fields"]["timeZoneOffset"]["value"] + create_date = datetime.fromtimestamp( + int(asset_record["fields"]["assetDate"]["value"]) / 1000, + tz=dateutil.tz.tzoffset(None, timezone_offset), + ) + + rating = None + # Hidden or Deleted Photos should be marked as rejected (needs running as --album "Hidden" or --album "Recently Deleted") + if ( + "isHidden" in asset_record["fields"] and asset_record["fields"]["isHidden"]["value"] == 1 + ) or ( + "isDeleted" in asset_record["fields"] and asset_record["fields"]["isDeleted"]["value"] == 1 + ): + rating = -1 # -1 means rejected: https://www.iptc.org/std/photometadata/specification/IPTC-PhotoMetadata#image-rating + # only mark photo as favorite if not hidden or deleted + elif asset_record["fields"]["isFavorite"]["value"] == 1: + rating = 5 + + return XMPMetadata( + XMPToolkit="icloudpd " + version_info.version + "+" + version_info.commit_sha, + Title=title, + Description=description, + Orientation=orientation, + Make=make, + DigitalSourceType=digital_source_type, + Keywords=keywords, + GPSAltitude=gps_altitude, + GPSLatitude=gps_latitude, + GPSLongitude=gps_longitude, + GPSSpeed=gps_speed, + GPSTimeStamp=gps_timestamp, + CreateDate=create_date, + Rating=rating, + ) + + +def generate_xml(metadata: XMPMetadata) -> ElementTree.Element: + # Create the root element + xml_doc = ElementTree.Element( + "x:xml_doc", {"xmlns:x": "adobe:ns:meta/", "x:xmptk": metadata.XMPToolkit} + ) + + # Create the RDF element + rdf = ElementTree.SubElement( + xml_doc, "rdf:RDF", {"xmlns:rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#"} + ) + + # Create the Description elements + description_dc = ElementTree.Element( + "rdf:Description", + { + "rdf:about": "", + "xmlns:dc": "http://purl.org/dc/elements/1.1/", + }, + ) + description_exif = ElementTree.Element( + "rdf:Description", + { + "rdf:about": "", + "xmlns:exif": "http://ns.adobe.com/exif/1.0/", + }, + ) + description_iptc4xmpext = ElementTree.Element( + "rdf:Description", + { + "rdf:about": "", + "xmlns:Iptc4xmpExt": "http://iptc.org/std/Iptc4xmpExt/2008-02-29/", + }, + ) + description_photoshop = ElementTree.Element( + "rdf:Description", + { + "rdf:about": "", + "xmlns:photoshop": "http://ns.adobe.com/photoshop/1.0/", + }, + ) + description_tiff = ElementTree.Element( + "rdf:Description", + { + "rdf:about": "", + "xmlns:tiff": "http://ns.adobe.com/tiff/1.0/", + }, + ) + description_xmp = ElementTree.Element( + "rdf:Description", + { + "rdf:about": "", + "xmlns:xmp": "http://ns.adobe.com/xap/1.0/", + }, + ) + if metadata.Title: + ElementTree.SubElement(description_dc, "dc:title").text = metadata.Title + if metadata.Description: + ElementTree.SubElement(description_dc, "dc:description").text = metadata.Description + + if metadata.Orientation: + ElementTree.SubElement(description_tiff, "tiff:Orientation").text = str( + metadata.Orientation + ) + if metadata.Make: + ElementTree.SubElement(description_tiff, "tiff:Make").text = metadata.Make + if metadata.DigitalSourceType: + ElementTree.SubElement( + description_iptc4xmpext, "Iptc4xmpExt:DigitalSourceType" + ).text = metadata.DigitalSourceType + + if metadata.Keywords: + subject = ElementTree.SubElement(description_dc, "dc:subject") + seq = ElementTree.SubElement(subject, "rdf:Seq") + for keyword in metadata.Keywords: + ElementTree.SubElement(seq, "rdf:li").text = keyword + + if metadata.GPSAltitude: + ElementTree.SubElement(description_exif, "exif:GPSAltitude").text = str( + metadata.GPSAltitude + ) + if metadata.GPSLatitude: + ElementTree.SubElement(description_exif, "exif:GPSLatitude").text = str( + metadata.GPSLatitude + ) + if metadata.GPSLongitude: + ElementTree.SubElement(description_exif, "exif:GPSLongitude").text = str( + metadata.GPSLongitude + ) + if metadata.GPSSpeed: + ElementTree.SubElement(description_exif, "exif:GPSSpeed").text = str(metadata.GPSSpeed) + if metadata.GPSTimeStamp: + ElementTree.SubElement( + description_exif, "exif:GPSTimeStamp" + ).text = metadata.GPSTimeStamp.strftime("%Y-%m-%dT%H:%M:%S%z") + + if metadata.CreateDate: + ElementTree.SubElement( + description_xmp, "xmp:CreateDate" + ).text = metadata.CreateDate.strftime("%Y-%m-%dT%H:%M:%S%z") + ElementTree.SubElement( + description_photoshop, "photoshop:DateCreated" + ).text = metadata.CreateDate.strftime( + "%Y-%m-%dT%H:%M:%S%z" + ) # Apple Photos uses this field when exporting an XMP sidecar + + if metadata.Rating: + ElementTree.SubElement(description_xmp, "xmp:Rating").text = str(metadata.Rating) + if len(list(description_dc)) > 0: + rdf.append(description_dc) + if len(list(description_exif)) > 0: + rdf.append(description_exif) + if len(list(description_iptc4xmpext)) > 0: + rdf.append(description_iptc4xmpext) + if len(list(description_photoshop)) > 0: + rdf.append(description_photoshop) + if len(list(description_tiff)) > 0: + rdf.append(description_tiff) + if len(list(description_xmp)) > 0: + rdf.append(description_xmp) + + return xml_doc diff --git a/src/pyicloud_ipd/services/photos.py b/src/pyicloud_ipd/services/photos.py index 7322ba31c..c3e12da44 100644 --- a/src/pyicloud_ipd/services/photos.py +++ b/src/pyicloud_ipd/services/photos.py @@ -509,6 +509,7 @@ def _list_query_gen(self, offset: int, list_type: str, direction: str, query_fil u'locationLatitude', u'locationLongitude', u'adjustmentType', u'timeZoneOffset', u'vidComplDurValue', u'vidComplDurScale', u'vidComplDispValue', u'vidComplDispScale', + u'keywordsEnc',u'extendedDescEnc',u'adjustedMediaMetaDataEnc',u'adjustmentSimpleDataEnc', u'vidComplVisibilityState', u'customRenderedValue', u'containerId', u'itemId', u'position', u'isKeyAsset' ], diff --git a/tests/test_xmp_sidecar.py b/tests/test_xmp_sidecar.py new file mode 100644 index 000000000..c5cc2ae3f --- /dev/null +++ b/tests/test_xmp_sidecar.py @@ -0,0 +1,81 @@ +from datetime import datetime +from typing import Any, Dict +from unittest import TestCase + +from foundation import version_info +from icloudpd.xmp_sidecar import XMPMetadata, build_metadata + + +class BuildXMPMetadata(TestCase): + def test_build_metadata(self) -> None: + assetRecordStub: Dict[str, Dict[str, Any]] = { + "fields": { + "captionEnc": {"value": "VGl0bGUgSGVyZQ==", "type": "ENCRYPTED_BYTES"}, + "extendedDescEnc": {"value": "Q2FwdGlvbiBIZXJl", "type": "ENCRYPTED_BYTES"}, + "adjustmentSimpleDataEnc": { + "type": "ENCRYPTED_BYTES", + "value": "PU67DoIwFP2XMzemBRKxo5Mummiig3G4QJEaCqS9sBD+XRDjdnLeI5xhKogJeoSjwMYfjH1VDB3LKBE/7m4LrqATGUcCrbemYWLbNtDpJFC23hHfjA9fSgkMKz42ZbsUZ72ti1PvMuOhESX7nYIAdd0/g61SG7lRqZyFkFfG0cUMdhWlQFcTLzOz01F+vmKepeLdB3bzlwD9eE4f", + }, + "assetSubtypeV2": {"value": 2, "type": "INT64"}, + "keywordsEnc": { + "value": "YnBsaXN0MDChAVxzb21lIGtleXdvcmQICgAAAAAAAAEBAAAAAAAAAAIAAAAAAAAAAAAAAAAAAAAX", + "type": "ENCRYPTED_BYTES", + }, + "locationEnc": { + "value": "YnBsaXN0MDDYAQIDBAUGBwgJCQoLCQwNCVZjb3Vyc2VVc3BlZWRTYWx0U2xvbld2ZXJ0QWNjU2xhdFl0aW1lc3RhbXBXaG9yekFjYyMAAAAAAAAAACNAdG9H6P0fpCNAWL2oZnRhiiNAMtKmTC+DezMAAAAAAAAAAAgZICYqLjY6RExVXmdwAAAAAAAAAQEAAAAAAAAADgAAAAAAAAAAAAAAAAAAAHk=", + "type": "ENCRYPTED_BYTES", + }, + "assetDate": {"value": 1532951050176, "type": "TIMESTAMP"}, + "isHidden": {"value": 0, "type": "INT64"}, + "isDeleted": {"value": 0, "type": "INT64"}, + "isFavorite": {"value": 0, "type": "INT64"}, + }, + } + + # Test full metadata record + metadata: XMPMetadata = build_metadata(assetRecordStub) + self.assertCountEqual( + metadata, + XMPMetadata( + XMPToolkit="icloudpd " + version_info.version + "+" + version_info.commit_sha, + Title="Title Here", + Description="Caption Here", + Orientation=8, + Make=None, + DigitalSourceType=None, + Keywords=["some keyword"], + GPSAltitude=326.9550561797753, + GPSLatitude=18.82285, + GPSLongitude=98.96340333333333, + GPSSpeed=0.0, + GPSTimeStamp=datetime.strptime( + "2001:01:01 00:00:00.000000+00:00", "%Y:%m:%d %H:%M:%S.%f%z" + ).replace(tzinfo=None), + CreateDate=datetime.strptime( + "2018:07:30 11:44:10.176000+00:00", "%Y:%m:%d %H:%M:%S.%f%z" + ), + Rating=None, + ), + ) + + # Test Screenshot Tagging + assetRecordStub["fields"]["assetSubtypeV2"]["value"] = 3 + metadata = build_metadata(assetRecordStub) + assert metadata.Make == "Screenshot" + assert metadata.DigitalSourceType == "screenCapture" + + # Test Favorites + assetRecordStub["fields"]["isFavorite"]["value"] = 1 + metadata = build_metadata(assetRecordStub) + assert metadata.Rating == 5 + + # Test Deleted + assetRecordStub["fields"]["isDeleted"]["value"] = 1 + metadata = build_metadata(assetRecordStub) + assert metadata.Rating == -1 + + # Test Hidden + assetRecordStub["fields"]["isDeleted"]["value"] = 0 + assetRecordStub["fields"]["isHidden"]["value"] = 1 + metadata = build_metadata(assetRecordStub) + assert metadata.Rating == -1