diff --git a/src/icloudpd/authentication.py b/src/icloudpd/authentication.py index da5a37864..fdcfc8994 100644 --- a/src/icloudpd/authentication.py +++ b/src/icloudpd/authentication.py @@ -14,13 +14,6 @@ from icloudpd.status import Status, StatusExchange -class TwoStepAuthRequiredError(Exception): - """ - Raised when 2SA is required. base.py catches this exception - and sends an email notification. - """ - - def authenticator( logger: logging.Logger, domain: str, @@ -33,13 +26,13 @@ def authenticator( ], mfa_provider: MFAProvider, status_exchange: StatusExchange, -) -> Callable[[str, Optional[str], bool, Optional[str]], PyiCloudService]: +) -> Callable[[str, Optional[str], Optional[Callable[[], None]], Optional[str]], PyiCloudService]: """Wraping authentication with domain context""" def authenticate_( username: str, cookie_directory: Optional[str] = None, - raise_error_on_2sa: bool = False, + mfa_error_callable: Optional[Callable[[], None]] = None, client_id: Optional[str] = None, ) -> PyiCloudService: """Authenticate with iCloud username and password""" @@ -74,8 +67,8 @@ def authenticate_( _writer(username, _valid_password) if icloud.requires_2fa: - if raise_error_on_2sa: - raise TwoStepAuthRequiredError("Two-factor authentication is required") + if mfa_error_callable: + mfa_error_callable() logger.info("Two-factor authentication is required (2fa)") if mfa_provider == MFAProvider.WEBUI: request_2fa_web(icloud, logger, status_exchange) @@ -83,8 +76,8 @@ def authenticate_( request_2fa(icloud, logger) elif icloud.requires_2sa: - if raise_error_on_2sa: - raise TwoStepAuthRequiredError("Two-step authentication is required") + if mfa_error_callable: + mfa_error_callable() logger.info("Two-step authentication is required (2sa)") request_2sa(icloud, logger) diff --git a/src/icloudpd/base.py b/src/icloudpd/base.py index 47cb41579..8ee5a046e 100644 --- a/src/icloudpd/base.py +++ b/src/icloudpd/base.py @@ -5,6 +5,9 @@ import foundation from foundation.core import compose, constant, identity +from icloudpd.notifier import Notifier +from icloudpd.ntfy import NtfySender +from icloudpd.script_notifier import ScriptNotifier from pyicloud_ipd.item_type import AssetItemType # fmt: skip from icloudpd.mfa_provider import MFAProvider @@ -55,11 +58,11 @@ from tzlocal import get_localzone from icloudpd import constants, download, exif_datetime -from icloudpd.authentication import TwoStepAuthRequiredError, authenticator +from icloudpd.authentication import authenticator from icloudpd.autodelete import autodelete_photos from icloudpd.config import Config from icloudpd.counter import Counter -from icloudpd.email_notifications import send_2sa_notification +from icloudpd.email_notifications import EmailNotifier from icloudpd.paths import clean_filename, local_download_path, remove_unicode_chars from icloudpd.server import serve_app from icloudpd.status import Status, StatusExchange @@ -457,6 +460,57 @@ def report_version(ctx: click.Context, _param: click.Parameter, value: bool) -> help="Runs an external script when two factor authentication expires. " "(path required: /path/to/my/script.sh)", ) +@click.option( + "--ntfy-server", + help="Ntfy server URL for notifications", + metavar="", +) +@click.option( + "--ntfy-topic", + help="Ntfy topic for notifications", + metavar="", +) +@click.option( + "--ntfy-protocol", + help="Ntfy protocol for notifications. Default is HTTPS.", + metavar="", +) +@click.option( + "--ntfy-username", + help="Ntfy username for notifications", + metavar="", +) +@click.option( + "--ntfy-password", + help="Ntfy password for notifications", + metavar="", +) +@click.option( + "--ntfy-token", + help="Ntfy token for notifications", + metavar="", +) +@click.option( + "--ntfy-priority", + help="Ntfy priority for notifications", + metavar="", +) +@click.option( + "--ntfy-tag", + help="Ntfy tags for notifications", + metavar="", + multiple=True, +) +@click.option( + "--ntfy-click", + help="Ntfy click action for notifications", + metavar="", +) +@click.option( + "--ntfy-email", + help="Ntfy email for notifications", + metavar="", +) @click.option( "--log-level", help="Log level (default: debug)", @@ -598,6 +652,16 @@ def main( log_level: str, no_progress_bar: bool, notification_script: Optional[str], + ntfy_server: Optional[str], + ntfy_topic: Optional[str], + ntfy_protocol: Optional[str], + ntfy_username: Optional[str], + ntfy_password: Optional[str], + ntfy_token: Optional[str], + ntfy_priority: Optional[str], + ntfy_tags: Optional[list[str]], + ntfy_click: Optional[str], + ntfy_email: Optional[str], threads_num: int, delete_after_download: bool, domain: str, @@ -679,6 +743,38 @@ def main( sys.exit(2) status_exchange = StatusExchange() + notifiers = [] + + if smtp_username or notification_email: + notifiers.append( + EmailNotifier( + smtp_host, + smtp_port, + smtp_no_tls, + smtp_username, + smtp_password, + notification_email, + notification_email_from, + ) + ) + + if notification_script: + notifiers.append(ScriptNotifier(notification_script)) + + if ntfy_server: + notifiers.append(NtfySender( + ntfy_server, + ntfy_topic, + ntfy_protocol, + ntfy_username, + ntfy_password, + ntfy_token, + ntfy_priority, + ntfy_tags, + ntfy_click, + ntfy_email + )) + config = Config( directory=directory, username=username, @@ -718,6 +814,7 @@ def main( file_match_policy=file_match_policy, mfa_provider=mfa_provider, use_os_locale=use_os_locale, + notifiers=notifiers ) status_exchange.set_config(config) @@ -774,15 +871,7 @@ def main( auto_delete, only_print_filenames, folder_structure, - smtp_username, - smtp_password, - smtp_host, - smtp_port, - smtp_no_tls, - notification_email, - notification_email_from, no_progress_bar, - notification_script, delete_after_download, domain, logger, @@ -795,6 +884,7 @@ def main( password_providers, mfa_provider, status_exchange, + notifiers ) sys.exit(result) @@ -1148,15 +1238,7 @@ def core( auto_delete: bool, only_print_filenames: bool, folder_structure: str, - smtp_username: Optional[str], - smtp_password: Optional[str], - smtp_host: str, - smtp_port: int, - smtp_no_tls: bool, - notification_email: Optional[str], - notification_email_from: Optional[str], no_progress_bar: bool, - notification_script: Optional[str], delete_after_download: bool, domain: str, logger: logging.Logger, @@ -1171,46 +1253,37 @@ def core( ], mfa_provider: MFAProvider, status_exchange: StatusExchange, + notifiers: list[Notifier] = [], ) -> int: """Download all iCloud photos to a local directory""" - raise_error_on_2sa = ( - smtp_username is not None - or notification_email is not None - or notification_script is not None + def _notify_mfa_error(): + title = "iCloud Photos Downloader: MFA" + msg = """Multi-factor authentication has expired. +Please log in to your server and update MFA.""" + + for notifier in notifiers: + try: + notifier.send_notification(title=title, message=msg) + except Exception as ex: + logger.error("Notifier %s failed with %s", notifier.__class__.__name__, ex) + + icloud = authenticator( + logger, + domain, + filename_cleaner, + lp_filename_generator, + raw_policy, + file_match_policy, + password_providers, + mfa_provider, + status_exchange, + )( + username, + cookie_directory, + _notify_mfa_error if len(notifiers) > 0 else None, + os.environ.get("CLIENT_ID"), ) - try: - icloud = authenticator( - logger, - domain, - filename_cleaner, - lp_filename_generator, - raw_policy, - file_match_policy, - password_providers, - mfa_provider, - status_exchange, - )( - username, - cookie_directory, - raise_error_on_2sa, - os.environ.get("CLIENT_ID"), - ) - except TwoStepAuthRequiredError: - if notification_script is not None: - subprocess.call([notification_script]) - if smtp_username is not None or notification_email is not None: - send_2sa_notification( - logger, - smtp_username, - smtp_password, - smtp_host, - smtp_port, - smtp_no_tls, - notification_email, - notification_email_from, - ) - return 1 if auth_only: logger.info("Authentication completed successfully") diff --git a/src/icloudpd/config.py b/src/icloudpd/config.py index 4a25203e2..12fa5d90b 100644 --- a/src/icloudpd/config.py +++ b/src/icloudpd/config.py @@ -1,5 +1,6 @@ from typing import Callable, Dict, Optional, Sequence, Tuple +from icloudpd.notifier import Notifier from pyicloud_ipd.file_match import FileMatchPolicy from pyicloud_ipd.raw_policy import RawTreatmentPolicy from pyicloud_ipd.version_size import AssetVersionSize, LivePhotoVersionSize @@ -50,6 +51,7 @@ def __init__( file_match_policy: FileMatchPolicy, mfa_provider: MFAProvider, use_os_locale: bool, + notifiers: list[Notifier] = [] ): self.directory = directory self.username = username @@ -89,3 +91,4 @@ def __init__( self.file_match_policy = file_match_policy self.mfa_provider = mfa_provider self.use_os_locale = use_os_locale + self.notifiers = notifiers diff --git a/src/icloudpd/email_notifications.py b/src/icloudpd/email_notifications.py index 8f716a04a..44d94debf 100644 --- a/src/icloudpd/email_notifications.py +++ b/src/icloudpd/email_notifications.py @@ -1,49 +1,44 @@ -"""Send an email notification when 2SA is expired""" - import datetime -import logging import smtplib -from typing import Optional, cast - - -def send_2sa_notification( - logger: logging.Logger, - smtp_email: Optional[str], - smtp_password: Optional[str], - smtp_host: str, - smtp_port: int, - smtp_no_tls: bool, - to_addr: Optional[str], - from_addr: Optional[str] = None, -) -> None: - """Send an email notification when 2SA is expired""" - to_addr = cast(str, to_addr if to_addr is not None else smtp_email) - from_addr = ( - from_addr - if from_addr is not None - else (f"iCloud Photos Downloader <{smtp_email}>" if smtp_email else to_addr) - ) - logger.info("Sending 'two-step expired' notification via email...") - smtp = smtplib.SMTP(smtp_host, smtp_port) - smtp.set_debuglevel(0) - # leaving explicit call of connect to not break unit tests, even though it is - # called implicitly via constructor parameters - smtp.connect(smtp_host, smtp_port) - if not smtp_no_tls: - smtp.starttls() - - if smtp_email is not None and smtp_password is not None: - smtp.login(smtp_email, smtp_password) - - subj = "icloud_photos_downloader: Two step authentication has expired" - date = datetime.datetime.now().strftime("%d/%m/%Y %H:%M") - - message_text = """Hello, - -Two-step authentication has expired for the icloud_photos_downloader script. -Please log in to your server and run the script manually to update two-step authentication.""" - - msg = f"From: {from_addr}\n" + f"To: {to_addr}\nSubject: {subj}\nDate: {date}\n\n{message_text}" - - smtp.sendmail(from_addr, to_addr, msg) - smtp.quit() + +from icloudpd.notifier import Notifier + + +class EmailNotifier(Notifier): + def __init__(self, + smtp_host: str, + smtp_port: int = 587, + smtp_no_tls: bool = False, + smtp_email: str | None = None, + smtp_password: str | None = None, + to_addr: str | None = None, + from_addr: str | None = None) -> None: + self.smtp_email = smtp_email + self.smtp_password = smtp_password + self.smtp_host = smtp_host + self.smtp_port = smtp_port + self.smtp_no_tls = smtp_no_tls + self.to_addr = to_addr or smtp_email + self.from_addr = from_addr or (f"iCloud Photos Downloader <{smtp_email}>" if smtp_email else to_addr) + + + def send_notification(self, + message: str, + title: str | None = None) -> None: + smtp = smtplib.SMTP(self.smtp_host, self.smtp_port) + smtp.set_debuglevel(0) + # leaving explicit call of connect to not break unit tests, even though it is + # called implicitly via constructor parameters + smtp.connect(self.smtp_host, self.smtp_port) + if not self.smtp_no_tls: + smtp.starttls() + + if self.smtp_email is not None and self.smtp_password is not None: + smtp.login(self.smtp_email, self.smtp_password) + + date = datetime.datetime.now().strftime("%d/%m/%Y %H:%M") + + msg = f"From: {self.from_addr}\n" + f"To: {self.to_addr}\nSubject: {title}\nDate: {date}\n\n{message}" + + smtp.sendmail(self.from_addr, self.to_addr, msg) + smtp.quit() diff --git a/src/icloudpd/notifier.py b/src/icloudpd/notifier.py new file mode 100644 index 000000000..a305cf88a --- /dev/null +++ b/src/icloudpd/notifier.py @@ -0,0 +1,8 @@ +from abc import ABC + + +class Notifier(ABC): + def send_notification(self, + message: str, + title: str | None = None) -> None: + ... \ No newline at end of file diff --git a/src/icloudpd/ntfy.py b/src/icloudpd/ntfy.py new file mode 100644 index 000000000..4166d720d --- /dev/null +++ b/src/icloudpd/ntfy.py @@ -0,0 +1,73 @@ +import requests +from requests.auth import HTTPBasicAuth + +from icloudpd.notifier import Notifier + + +class NtfySender(Notifier): + + def __init__(self, + server: str, + topic: str, + protocol: str = "https", + username: str | None = None, + password: str | None = None, + token: str | None = None, + priority: str | None = None, + tags: list[str] = [], + click: str | None = None, + email: str | None = None) -> None: + self.server = server + self.topic = topic + self.protocol = protocol + self.username = username + self.password = password + self.token = token + self.priority = priority + self.tags = tags + self.click = click + self.email = email + + + def _build_url(self) -> str: + return f"{self.protocol}://{self.server}/{self.topic}" + + + def _build_headers(self, title: str | None = None) -> dict[str, str]: + headers = {} + + if title: + headers["X-Title"] = title + + if self.priority: + headers["X-Priority"] = self.priority + + if len(self.tags) > 0: + headers["X-Tags"] = ",".join(self.tags) + + if self.click: + headers["X-Click"] = self.click + + if self.email: + headers["X-Email"] = self.email + + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + + return headers + + + def _build_auth(self) -> None: + if self.username or self.password: + return HTTPBasicAuth(self.username, self.password) + return None + + + def send_notification(self, + message: str, + title: str | None = None) -> None: + response = requests.post(self._build_url(), + data=message.encode(encoding='utf-8'), + headers=self._build_headers(title), + auth=self._build_auth()) + response.raise_for_status() \ No newline at end of file diff --git a/src/icloudpd/script_notifier.py b/src/icloudpd/script_notifier.py new file mode 100644 index 000000000..ef43a0435 --- /dev/null +++ b/src/icloudpd/script_notifier.py @@ -0,0 +1,10 @@ +import subprocess +from icloudpd.notifier import Notifier + + +class ScriptNotifier(Notifier): + def __init__(self, script: str) -> None: + self.script = script + + def send_notification(self, message: str, title: str | None = None) -> None: + subprocess.run([self.script]) \ No newline at end of file