diff --git a/CHANGELOG.md b/CHANGELOG.md index 88105cc3f..664eeb679 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ ## Unreleased -- feature: add webui for entering MFA code with `--mfa-provider` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805) +- feature: add webui for entering password with `--password-provider webui` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805) +- feature: add webui for entering MFA code with `--mfa-provider webui` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805) - fix: allow MFA with leading zeros [ref](https://github.com/boredazfcuk/docker-icloudpd/issues/599) ## 1.20.4 (2024-06-30) diff --git a/src/icloudpd/authentication.py b/src/icloudpd/authentication.py index 1b02e2c2a..af77389aa 100644 --- a/src/icloudpd/authentication.py +++ b/src/icloudpd/authentication.py @@ -182,8 +182,8 @@ def request_2fa_web( icloud: PyiCloudService, logger: logging.Logger, status_exchange: StatusExchange ) -> None: """Request two-factor authentication through Webui.""" - if not status_exchange.replace_status(Status.NOT_NEED_MFA, Status.NEED_MFA): - logger.error("Expected NOT_NEED_MFA, but got something else") + if not status_exchange.replace_status(Status.NO_INPUT_NEEDED, Status.NEED_MFA): + logger.error("Expected NO_INPUT_NEEDED, but got something else") return # wait for input @@ -195,17 +195,21 @@ def request_2fa_web( break if status_exchange.replace_status(Status.SUPPLIED_MFA, Status.CHECKING_MFA): - code = status_exchange.get_code() + code = status_exchange.get_payload() if not code: logger.error("Internal error: did not get code for SUPPLIED_MFA status") - status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # TODO Error + status_exchange.replace_status( + Status.CHECKING_MFA, Status.NO_INPUT_NEEDED + ) # TODO Error return if not icloud.validate_2fa_code(code): logger.error("Failed to verify two-factor authentication code") - status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # TODO Error + status_exchange.replace_status( + Status.CHECKING_MFA, Status.NO_INPUT_NEEDED + ) # TODO Error return - status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # done + status_exchange.replace_status(Status.CHECKING_MFA, Status.NO_INPUT_NEEDED) # done logger.info( "Great, you're all set up. The script can now be run without " diff --git a/src/icloudpd/base.py b/src/icloudpd/base.py index 8d2bd26a7..f9546e477 100644 --- a/src/icloudpd/base.py +++ b/src/icloudpd/base.py @@ -58,7 +58,7 @@ from icloudpd.email_notifications import send_2sa_notification from icloudpd.paths import clean_filename, local_download_path, remove_unicode_chars from icloudpd.server import serve_app -from icloudpd.status import StatusExchange +from icloudpd.status import Status, StatusExchange from icloudpd.string_helpers import truncate_middle @@ -148,6 +148,46 @@ def ask_password_in_console(_user: str) -> Optional[str]: # ) +def get_password_from_webui( + logger: Logger, status_exchange: StatusExchange +) -> Callable[[str], Optional[str]]: + def _intern(_user: str) -> Optional[str]: + """Request two-factor authentication through Webui.""" + if not status_exchange.replace_status(Status.NO_INPUT_NEEDED, Status.NEED_PASSWORD): + logger.error("Expected NO_INPUT_NEEDED, but got something else") + return None + + # wait for input + while True: + status = status_exchange.get_status() + if status == Status.NEED_PASSWORD: + time.sleep(1) + else: + break + if status_exchange.replace_status(Status.SUPPLIED_PASSWORD, Status.CHECKING_PASSWORD): + password = status_exchange.get_payload() + if not password: + logger.error("Internal error: did not get password for SUPPLIED_PASSWORD status") + status_exchange.replace_status( + Status.CHECKING_PASSWORD, Status.NO_INPUT_NEEDED + ) # TODO Error + return None + return password + + return None # TODO + + return _intern + + +def update_password_status_in_webui(status_exchange: StatusExchange) -> Callable[[str, str], None]: + def _intern(_u: str, _p: str) -> None: + # TODO we are not handling wrong passwords... + status_exchange.replace_status(Status.CHECKING_PASSWORD, Status.NO_INPUT_NEEDED) + return None + + return _intern + + # def get_click_param_by_name(_name: str, _params: List[Parameter]) -> Optional[Parameter]: # _with_password = [_p for _p in _params if _name in _p.name] # if len(_with_password) == 0: @@ -163,6 +203,8 @@ def password_provider_generator( _ctx: click.Context, _param: click.Parameter, providers: Sequence[str] ) -> Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]]: def _map(provider: str) -> Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]: + if provider == "webui": + return (ask_password_in_console, dummy_password_writter) if provider == "console": return (ask_password_in_console, dummy_password_writter) elif provider == "keyring": @@ -456,7 +498,7 @@ def file_match_policy_generator( "--password-provider", "password_providers", help="Specifies passwords provider to check in the specified order", - type=click.Choice(["console", "keyring", "parameter"], case_sensitive=False), + type=click.Choice(["console", "keyring", "parameter", "webui"], case_sensitive=False), default=["parameter", "keyring", "console"], show_default=True, multiple=True, @@ -573,8 +615,28 @@ def main( print("You need to specify at least one --password-provider") sys.exit(2) + if "console" in password_providers and "webui" in password_providers: + print("Console and webui are not compatible in --password-provider") + sys.exit(2) + + if "console" in password_providers and list(password_providers)[-1] != "console": + print("Console must be the last --password-provider") + sys.exit(2) + + if "webui" in password_providers and list(password_providers)[-1] != "webui": + print("Webui must be the last --password-provider") + sys.exit(2) + status_exchange = StatusExchange() + # hacky way to use one param in another + if "webui" in password_providers: + # replace + password_providers["webui"] = ( + get_password_from_webui(logger, status_exchange), + update_password_status_in_webui(status_exchange), + ) + # start web server if mfa_provider == MFAProvider.WEBUI: server_thread = Thread(target=serve_app, daemon=True, args=[logger, status_exchange]) diff --git a/src/icloudpd/server/__init__.py b/src/icloudpd/server/__init__.py index a41cbd289..2b8e90d1d 100644 --- a/src/icloudpd/server/__init__.py +++ b/src/icloudpd/server/__init__.py @@ -18,21 +18,33 @@ def index() -> Union[Response, str]: @app.route("/status", methods=["GET"]) def get_status() -> Union[Response, str]: _status = _status_exchange.get_status() - if _status == Status.NOT_NEED_MFA: + if _status == Status.NO_INPUT_NEEDED: return render_template("no_input.html") if _status == Status.NEED_MFA: return render_template("code.html") + if _status == Status.NEED_PASSWORD: + return render_template("password.html") return render_template("status.html", status=_status) @app.route("/code", methods=["POST"]) def set_code() -> Union[Response, str]: code = request.form.get("code") if code is not None: - if _status_exchange.set_code(code): - return render_template("submitted.html") + if _status_exchange.set_payload(code): + return render_template("code_submitted.html") else: logger.error(f"cannot find code in request {request.form}") return make_response(render_template("error.html"), 400) # incorrect code + @app.route("/password", methods=["POST"]) + def set_password() -> Union[Response, str]: + password = request.form.get("password") + if password is not None: + if _status_exchange.set_payload(password): + return render_template("password_submitted.html") + else: + logger.error(f"cannot find password in request {request.form}") + return make_response(render_template("error.html"), 400) # incorrect code + logger.debug("Starting web server...") return waitress.serve(app) diff --git a/src/icloudpd/server/templates/submitted.html b/src/icloudpd/server/templates/code_submitted.html similarity index 100% rename from src/icloudpd/server/templates/submitted.html rename to src/icloudpd/server/templates/code_submitted.html diff --git a/src/icloudpd/server/templates/password.html b/src/icloudpd/server/templates/password.html new file mode 100644 index 000000000..dcaa62c18 --- /dev/null +++ b/src/icloudpd/server/templates/password.html @@ -0,0 +1,6 @@ +
+

Enter Password

+ + + +
\ No newline at end of file diff --git a/src/icloudpd/server/templates/password_submitted.html b/src/icloudpd/server/templates/password_submitted.html new file mode 100644 index 000000000..725e05eb8 --- /dev/null +++ b/src/icloudpd/server/templates/password_submitted.html @@ -0,0 +1,4 @@ +
+

Password submitted and being checked

+ +
diff --git a/src/icloudpd/status.py b/src/icloudpd/status.py index 06575628f..83dc3bcfc 100644 --- a/src/icloudpd/status.py +++ b/src/icloudpd/status.py @@ -4,17 +4,20 @@ class Status(Enum): - NOT_NEED_MFA = "not_need_mfa" + NO_INPUT_NEEDED = "no_input_needed" NEED_MFA = "need_mfa" SUPPLIED_MFA = "supplied_mfa" CHECKING_MFA = "checking_mfa" + NEED_PASSWORD = "need_password" + SUPPLIED_PASSWORD = "supplied_password" + CHECKING_PASSWORD = "checking_password" class StatusExchange: def __init__(self) -> None: self.lock = Lock() - self._status = Status.NOT_NEED_MFA - self._code: Optional[str] = None + self._status = Status.NO_INPUT_NEEDED + self._payload: Optional[str] = None def get_status(self) -> Status: with self.lock: @@ -28,18 +31,25 @@ def replace_status(self, expected_status: Status, new_status: Status) -> bool: else: return False - def set_code(self, code: str) -> bool: + def set_payload(self, payload: str) -> bool: with self.lock: - if self._status != Status.NEED_MFA: + if self._status != Status.NEED_MFA and self._status != Status.NEED_PASSWORD: return False - self._code = code - self._status = Status.SUPPLIED_MFA + self._payload = payload + self._status = ( + Status.SUPPLIED_MFA if self._status == Status.NEED_MFA else Status.SUPPLIED_PASSWORD + ) return True - def get_code(self) -> Optional[str]: + def get_payload(self) -> Optional[str]: with self.lock: - if self._status not in [Status.SUPPLIED_MFA, Status.CHECKING_MFA]: + if self._status not in [ + Status.SUPPLIED_MFA, + Status.CHECKING_MFA, + Status.SUPPLIED_PASSWORD, + Status.CHECKING_PASSWORD, + ]: return None - return self._code + return self._payload