Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add webui for password #805 #893

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

## Unreleased

- feature: add webui for entering MFA code with `--mfa-provider` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805)
- feature: add webui for entering password with `--password-provider webui` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805)
- feature: add webui for entering MFA code with `--mfa-provider webui` parameter [#805](https://github.com/icloud-photos-downloader/icloud_photos_downloader/issues/805)
- fix: allow MFA with leading zeros [ref](https://github.com/boredazfcuk/docker-icloudpd/issues/599)

## 1.20.4 (2024-06-30)
Expand Down
16 changes: 10 additions & 6 deletions src/icloudpd/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ def request_2fa_web(
icloud: PyiCloudService, logger: logging.Logger, status_exchange: StatusExchange
) -> None:
"""Request two-factor authentication through Webui."""
if not status_exchange.replace_status(Status.NOT_NEED_MFA, Status.NEED_MFA):
logger.error("Expected NOT_NEED_MFA, but got something else")
if not status_exchange.replace_status(Status.NO_INPUT_NEEDED, Status.NEED_MFA):
logger.error("Expected NO_INPUT_NEEDED, but got something else")
return

# wait for input
Expand All @@ -195,17 +195,21 @@ def request_2fa_web(
break

if status_exchange.replace_status(Status.SUPPLIED_MFA, Status.CHECKING_MFA):
code = status_exchange.get_code()
code = status_exchange.get_payload()
if not code:
logger.error("Internal error: did not get code for SUPPLIED_MFA status")
status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # TODO Error
status_exchange.replace_status(
Status.CHECKING_MFA, Status.NO_INPUT_NEEDED
) # TODO Error
return

if not icloud.validate_2fa_code(code):
logger.error("Failed to verify two-factor authentication code")
status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # TODO Error
status_exchange.replace_status(
Status.CHECKING_MFA, Status.NO_INPUT_NEEDED
) # TODO Error
return
status_exchange.replace_status(Status.CHECKING_MFA, Status.NOT_NEED_MFA) # done
status_exchange.replace_status(Status.CHECKING_MFA, Status.NO_INPUT_NEEDED) # done

logger.info(
"Great, you're all set up. The script can now be run without "
Expand Down
66 changes: 64 additions & 2 deletions src/icloudpd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
from icloudpd.email_notifications import send_2sa_notification
from icloudpd.paths import clean_filename, local_download_path, remove_unicode_chars
from icloudpd.server import serve_app
from icloudpd.status import StatusExchange
from icloudpd.status import Status, StatusExchange
from icloudpd.string_helpers import truncate_middle


Expand Down Expand Up @@ -148,6 +148,46 @@ def ask_password_in_console(_user: str) -> Optional[str]:
# )


def get_password_from_webui(
logger: Logger, status_exchange: StatusExchange
) -> Callable[[str], Optional[str]]:
def _intern(_user: str) -> Optional[str]:
"""Request two-factor authentication through Webui."""
if not status_exchange.replace_status(Status.NO_INPUT_NEEDED, Status.NEED_PASSWORD):
logger.error("Expected NO_INPUT_NEEDED, but got something else")
return None

# wait for input
while True:
status = status_exchange.get_status()
if status == Status.NEED_PASSWORD:
time.sleep(1)
else:
break
if status_exchange.replace_status(Status.SUPPLIED_PASSWORD, Status.CHECKING_PASSWORD):
password = status_exchange.get_payload()
if not password:
logger.error("Internal error: did not get password for SUPPLIED_PASSWORD status")
status_exchange.replace_status(
Status.CHECKING_PASSWORD, Status.NO_INPUT_NEEDED
) # TODO Error
return None
return password

return None # TODO

return _intern


def update_password_status_in_webui(status_exchange: StatusExchange) -> Callable[[str, str], None]:
def _intern(_u: str, _p: str) -> None:
# TODO we are not handling wrong passwords...
status_exchange.replace_status(Status.CHECKING_PASSWORD, Status.NO_INPUT_NEEDED)
return None

return _intern


# def get_click_param_by_name(_name: str, _params: List[Parameter]) -> Optional[Parameter]:
# _with_password = [_p for _p in _params if _name in _p.name]
# if len(_with_password) == 0:
Expand All @@ -163,6 +203,8 @@ def password_provider_generator(
_ctx: click.Context, _param: click.Parameter, providers: Sequence[str]
) -> Dict[str, Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]]:
def _map(provider: str) -> Tuple[Callable[[str], Optional[str]], Callable[[str, str], None]]:
if provider == "webui":
return (ask_password_in_console, dummy_password_writter)
if provider == "console":
return (ask_password_in_console, dummy_password_writter)
elif provider == "keyring":
Expand Down Expand Up @@ -456,7 +498,7 @@ def file_match_policy_generator(
"--password-provider",
"password_providers",
help="Specifies passwords provider to check in the specified order",
type=click.Choice(["console", "keyring", "parameter"], case_sensitive=False),
type=click.Choice(["console", "keyring", "parameter", "webui"], case_sensitive=False),
default=["parameter", "keyring", "console"],
show_default=True,
multiple=True,
Expand Down Expand Up @@ -573,8 +615,28 @@ def main(
print("You need to specify at least one --password-provider")
sys.exit(2)

if "console" in password_providers and "webui" in password_providers:
print("Console and webui are not compatible in --password-provider")
sys.exit(2)

if "console" in password_providers and list(password_providers)[-1] != "console":
print("Console must be the last --password-provider")
sys.exit(2)

if "webui" in password_providers and list(password_providers)[-1] != "webui":
print("Webui must be the last --password-provider")
sys.exit(2)

status_exchange = StatusExchange()

# hacky way to use one param in another
if "webui" in password_providers:
# replace
password_providers["webui"] = (
get_password_from_webui(logger, status_exchange),
update_password_status_in_webui(status_exchange),
)

# start web server
if mfa_provider == MFAProvider.WEBUI:
server_thread = Thread(target=serve_app, daemon=True, args=[logger, status_exchange])
Expand Down
18 changes: 15 additions & 3 deletions src/icloudpd/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,33 @@ def index() -> Union[Response, str]:
@app.route("/status", methods=["GET"])
def get_status() -> Union[Response, str]:
_status = _status_exchange.get_status()
if _status == Status.NOT_NEED_MFA:
if _status == Status.NO_INPUT_NEEDED:
return render_template("no_input.html")
if _status == Status.NEED_MFA:
return render_template("code.html")
if _status == Status.NEED_PASSWORD:
return render_template("password.html")
return render_template("status.html", status=_status)

@app.route("/code", methods=["POST"])
def set_code() -> Union[Response, str]:
code = request.form.get("code")
if code is not None:
if _status_exchange.set_code(code):
return render_template("submitted.html")
if _status_exchange.set_payload(code):
return render_template("code_submitted.html")
else:
logger.error(f"cannot find code in request {request.form}")
return make_response(render_template("error.html"), 400) # incorrect code

@app.route("/password", methods=["POST"])
def set_password() -> Union[Response, str]:
password = request.form.get("password")
if password is not None:
if _status_exchange.set_payload(password):
return render_template("password_submitted.html")
else:
logger.error(f"cannot find password in request {request.form}")
return make_response(render_template("error.html"), 400) # incorrect code

logger.debug("Starting web server...")
return waitress.serve(app)
6 changes: 6 additions & 0 deletions src/icloudpd/server/templates/password.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<form hx-post="/password" hx-swap="outerHTML">
<p>Enter Password</p>
<label for="password">Password:</label>
<input type="password" id="password" name="password"/>
<button type="submit">Submit</button>
</form>
4 changes: 4 additions & 0 deletions src/icloudpd/server/templates/password_submitted.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<div id="zone">
<p>Password submitted and being checked</p>
<button hx-get="/status" hx-trigger="load delay:5sec" hx-target="#zone">Refresh</button>
</div>
30 changes: 20 additions & 10 deletions src/icloudpd/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@


class Status(Enum):
NOT_NEED_MFA = "not_need_mfa"
NO_INPUT_NEEDED = "no_input_needed"
NEED_MFA = "need_mfa"
SUPPLIED_MFA = "supplied_mfa"
CHECKING_MFA = "checking_mfa"
NEED_PASSWORD = "need_password"
SUPPLIED_PASSWORD = "supplied_password"
CHECKING_PASSWORD = "checking_password"


class StatusExchange:
def __init__(self) -> None:
self.lock = Lock()
self._status = Status.NOT_NEED_MFA
self._code: Optional[str] = None
self._status = Status.NO_INPUT_NEEDED
self._payload: Optional[str] = None

def get_status(self) -> Status:
with self.lock:
Expand All @@ -28,18 +31,25 @@ def replace_status(self, expected_status: Status, new_status: Status) -> bool:
else:
return False

def set_code(self, code: str) -> bool:
def set_payload(self, payload: str) -> bool:
with self.lock:
if self._status != Status.NEED_MFA:
if self._status != Status.NEED_MFA and self._status != Status.NEED_PASSWORD:
return False

self._code = code
self._status = Status.SUPPLIED_MFA
self._payload = payload
self._status = (
Status.SUPPLIED_MFA if self._status == Status.NEED_MFA else Status.SUPPLIED_PASSWORD
)
return True

def get_code(self) -> Optional[str]:
def get_payload(self) -> Optional[str]:
with self.lock:
if self._status not in [Status.SUPPLIED_MFA, Status.CHECKING_MFA]:
if self._status not in [
Status.SUPPLIED_MFA,
Status.CHECKING_MFA,
Status.SUPPLIED_PASSWORD,
Status.CHECKING_PASSWORD,
]:
return None

return self._code
return self._payload
Loading