Skip to content

Commit

Permalink
add webui for password #805 (#893)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyNikiforov authored Jul 5, 2024
1 parent 9992078 commit 4b4df49
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 22 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## Unreleased

- feature: add webui for entering MFA code with `--mfa-provider` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805)
- feature: add webui for entering password with `--password-provider webui` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805)
- feature: add webui for entering MFA code with `--mfa-provider webui` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805)
- fix: allow MFA with leading zeros [ref](https://github.com/boredazfcuk/docker-icloudpd/issues/599)

## 1.20.4 (2024-06-30)
Expand Down
16 changes: 10 additions & 6 deletions src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ def request_2fa_web(
icloud: PyiCloudService, logger: logging.Logger, status_exchange: StatusExchange
) -> None:
"""Request two-factor authentication through Webui."""
if not status_exchange.replace_status(Status.NOT_NEED_MFA, Status.NEED_MFA):
logger.error("Expected NOT_NEED_MFA, but got something else")
if not status_exchange.replace_status(Status.NO_INPUT_NEEDED, Status.NEED_MFA):
logger.error("Expected NO_INPUT_NEEDED, but got something else")
return

# wait for input
Expand All @@ -195,17 +195,21 @@ def request_2fa_web(
break

if status_exchange.replace_status(Status.SUPPLIED_MFA, Status.CHECKING_MFA):
code = status_exchange.get_code()
code = status_exchange.get_payload()
if not code:
logger.error("Internal error: did not get code for SUPPLIED_MFA status")
status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # TODO Error
status_exchange.replace_status(
Status.CHECKING_MFA, Status.NO_INPUT_NEEDED
) # TODO Error
return

if not icloud.validate_2fa_code(code):
logger.error("Failed to verify two-factor authentication code")
status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # TODO Error
status_exchange.replace_status(
Status.CHECKING_MFA, Status.NO_INPUT_NEEDED
) # TODO Error
return
status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # done
status_exchange.replace_status(Status.CHECKING_MFA, Status.NO_INPUT_NEEDED) # done

logger.info(
"Great, you're all set up. The script can now be run without "
Expand Down
66 changes: 64 additions & 2 deletions src/icloudpd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from icloudpd.email_notifications import send_2sa_notification
from icloudpd.paths import clean_filename, local_download_path, remove_unicode_chars
from icloudpd.server import serve_app
from icloudpd.status import StatusExchange
from icloudpd.status import Status, StatusExchange
from icloudpd.string_helpers import truncate_middle


Expand Down Expand Up @@ -148,6 +148,46 @@ def ask_password_in_console(_user: str) -> Optional[str]:
# )


def get_password_from_webui(
logger: Logger, status_exchange: StatusExchange
) -> Callable[[str], Optional[str]]:
def _intern(_user: str) -> Optional[str]:
"""Request two-factor authentication through Webui."""
if not status_exchange.replace_status(Status.NO_INPUT_NEEDED, Status.NEED_PASSWORD):
logger.error("Expected NO_INPUT_NEEDED, but got something else")
return None

# wait for input
while True:
status = status_exchange.get_status()
if status == Status.NEED_PASSWORD:
time.sleep(1)
else:
break
if status_exchange.replace_status(Status.SUPPLIED_PASSWORD, Status.CHECKING_PASSWORD):
password = status_exchange.get_payload()
if not password:
logger.error("Internal error: did not get password for SUPPLIED_PASSWORD status")
status_exchange.replace_status(
Status.CHECKING_PASSWORD, Status.NO_INPUT_NEEDED
) # TODO Error
return None
return password

return None # TODO

return _intern


def update_password_status_in_webui(status_exchange: StatusExchange) -> Callable[[str, str], None]:
def _intern(_u: str, _p: str) -> None:
# TODO we are not handling wrong passwords...
status_exchange.replace_status(Status.CHECKING_PASSWORD, Status.NO_INPUT_NEEDED)
return None

return _intern


# def get_click_param_by_name(_name: str, _params: List[Parameter]) -> Optional[Parameter]:
# _with_password = [_p for _p in _params if _name in _p.name]
# if len(_with_password) == 0:
Expand All @@ -163,6 +203,8 @@ def password_provider_generator(
_ctx: click.Context, _param: click.Parameter, providers: Sequence[str]
) -> Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]]:
def _map(provider: str) -> Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]:
if provider == "webui":
return (ask_password_in_console, dummy_password_writter)
if provider == "console":
return (ask_password_in_console, dummy_password_writter)
elif provider == "keyring":
Expand Down Expand Up @@ -456,7 +498,7 @@ def file_match_policy_generator(
"--password-provider",
"password_providers",
help="Specifies passwords provider to check in the specified order",
type=click.Choice(["console", "keyring", "parameter"], case_sensitive=False),
type=click.Choice(["console", "keyring", "parameter", "webui"], case_sensitive=False),
default=["parameter", "keyring", "console"],
show_default=True,
multiple=True,
Expand Down Expand Up @@ -573,8 +615,28 @@ def main(
print("You need to specify at least one --password-provider")
sys.exit(2)

if "console" in password_providers and "webui" in password_providers:
print("Console and webui are not compatible in --password-provider")
sys.exit(2)

if "console" in password_providers and list(password_providers)[-1] != "console":
print("Console must be the last --password-provider")
sys.exit(2)

if "webui" in password_providers and list(password_providers)[-1] != "webui":
print("Webui must be the last --password-provider")
sys.exit(2)

status_exchange = StatusExchange()

# hacky way to use one param in another
if "webui" in password_providers:
# replace
password_providers["webui"] = (
get_password_from_webui(logger, status_exchange),
update_password_status_in_webui(status_exchange),
)

# start web server
if mfa_provider == MFAProvider.WEBUI:
server_thread = Thread(target=serve_app, daemon=True, args=[logger, status_exchange])
Expand Down
18 changes: 15 additions & 3 deletions src/icloudpd/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,33 @@ def index() -> Union[Response, str]:
@app.route("/status", methods=["GET"])
def get_status() -> Union[Response, str]:
_status = _status_exchange.get_status()
if _status == Status.NOT_NEED_MFA:
if _status == Status.NO_INPUT_NEEDED:
return render_template("no_input.html")
if _status == Status.NEED_MFA:
return render_template("code.html")
if _status == Status.NEED_PASSWORD:
return render_template("password.html")
return render_template("status.html", status=_status)

@app.route("/code", methods=["POST"])
def set_code() -> Union[Response, str]:
code = request.form.get("code")
if code is not None:
if _status_exchange.set_code(code):
return render_template("submitted.html")
if _status_exchange.set_payload(code):
return render_template("code_submitted.html")
else:
logger.error(f"cannot find code in request {request.form}")
return make_response(render_template("error.html"), 400) # incorrect code

@app.route("/password", methods=["POST"])
def set_password() -> Union[Response, str]:
password = request.form.get("password")
if password is not None:
if _status_exchange.set_payload(password):
return render_template("password_submitted.html")
else:
logger.error(f"cannot find password in request {request.form}")
return make_response(render_template("error.html"), 400) # incorrect code

logger.debug("Starting web server...")
return waitress.serve(app)
6 changes: 6 additions & 0 deletions src/icloudpd/server/templates/password.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<form hx-post="/password" hx-swap="outerHTML">
<p>Enter Password</p>
<label for="password">Password:</label>
<input type="password" id="password" name="password"/>
<button type="submit">Submit</button>
</form>
4 changes: 4 additions & 0 deletions src/icloudpd/server/templates/password_submitted.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<div id="zone">
<p>Password submitted and being checked</p>
<button hx-get="/status" hx-trigger="load delay:5sec" hx-target="#zone">Refresh</button>
</div>
30 changes: 20 additions & 10 deletions src/icloudpd/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@


class Status(Enum):
NOT_NEED_MFA = "not_need_mfa"
NO_INPUT_NEEDED = "no_input_needed"
NEED_MFA = "need_mfa"
SUPPLIED_MFA = "supplied_mfa"
CHECKING_MFA = "checking_mfa"
NEED_PASSWORD = "need_password"
SUPPLIED_PASSWORD = "supplied_password"
CHECKING_PASSWORD = "checking_password"


class StatusExchange:
def __init__(self) -> None:
self.lock = Lock()
self._status = Status.NOT_NEED_MFA
self._code: Optional[str] = None
self._status = Status.NO_INPUT_NEEDED
self._payload: Optional[str] = None

def get_status(self) -> Status:
with self.lock:
Expand All @@ -28,18 +31,25 @@ def replace_status(self, expected_status: Status, new_status: Status) -> bool:
else:
return False

def set_code(self, code: str) -> bool:
def set_payload(self, payload: str) -> bool:
with self.lock:
if self._status != Status.NEED_MFA:
if self._status != Status.NEED_MFA and self._status != Status.NEED_PASSWORD:
return False

self._code = code
self._status = Status.SUPPLIED_MFA
self._payload = payload
self._status = (
Status.SUPPLIED_MFA if self._status == Status.NEED_MFA else Status.SUPPLIED_PASSWORD
)
return True

def get_code(self) -> Optional[str]:
def get_payload(self) -> Optional[str]:
with self.lock:
if self._status not in [Status.SUPPLIED_MFA, Status.CHECKING_MFA]:
if self._status not in [
Status.SUPPLIED_MFA,
Status.CHECKING_MFA,
Status.SUPPLIED_PASSWORD,
Status.CHECKING_PASSWORD,
]:
return None

return self._code
return self._payload

0 comments on commit 4b4df49

Please sign in to comment.