From f97d96e3aeafd95834627997bfc969d8bc8a4cd4 Mon Sep 17 00:00:00 2001 From: Richard Kroegel <42204099+rikroe@users.noreply.github.com> Date: Thu, 28 Nov 2024 21:01:00 +0100 Subject: [PATCH] Add captcha to BMW ConfigFlow (#131351) Co-authored-by: Franck Nijhof --- .../bmw_connected_drive/config_flow.py | 71 ++++++++-- .../components/bmw_connected_drive/const.py | 5 + .../bmw_connected_drive/coordinator.py | 5 - .../bmw_connected_drive/strings.json | 10 ++ .../bmw_connected_drive/__init__.py | 9 +- .../snapshots/test_diagnostics.ambr | 6 +- .../bmw_connected_drive/test_config_flow.py | 121 ++++++++++-------- 7 files changed, 153 insertions(+), 74 deletions(-) diff --git a/homeassistant/components/bmw_connected_drive/config_flow.py b/homeassistant/components/bmw_connected_drive/config_flow.py index 409bfdca6f189..8831895c71eff 100644 --- a/homeassistant/components/bmw_connected_drive/config_flow.py +++ b/homeassistant/components/bmw_connected_drive/config_flow.py @@ -27,9 +27,18 @@ from homeassistant.core import HomeAssistant, callback from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.selector import SelectSelector, SelectSelectorConfig +from homeassistant.util.ssl import get_default_context from . import DOMAIN -from .const import CONF_ALLOWED_REGIONS, CONF_GCID, CONF_READ_ONLY, CONF_REFRESH_TOKEN +from .const import ( + CONF_ALLOWED_REGIONS, + CONF_CAPTCHA_REGIONS, + CONF_CAPTCHA_TOKEN, + CONF_CAPTCHA_URL, + CONF_GCID, + CONF_READ_ONLY, + CONF_REFRESH_TOKEN, +) DATA_SCHEMA = vol.Schema( { @@ -41,7 +50,14 @@ translation_key="regions", ) ), - } + }, + extra=vol.REMOVE_EXTRA, +) +CAPTCHA_SCHEMA = vol.Schema( + { + vol.Required(CONF_CAPTCHA_TOKEN): str, + }, + extra=vol.REMOVE_EXTRA, ) @@ -54,6 +70,8 @@ async def validate_input(hass: HomeAssistant, data: dict[str, Any]) -> dict[str, data[CONF_USERNAME], data[CONF_PASSWORD], get_region_from_name(data[CONF_REGION]), + hcaptcha_token=data.get(CONF_CAPTCHA_TOKEN), + verify=get_default_context(), ) try: @@ -79,15 +97,17 @@ class BMWConfigFlow(ConfigFlow, domain=DOMAIN): VERSION = 1 + data: dict[str, Any] = {} + _existing_entry_data: Mapping[str, Any] | None = None async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> ConfigFlowResult: """Handle the initial step.""" - errors: dict[str, str] = {} + errors: dict[str, str] = self.data.pop("errors", {}) - if user_input is not None: + if user_input is not None and not errors: unique_id = f"{user_input[CONF_REGION]}-{user_input[CONF_USERNAME]}" await self.async_set_unique_id(unique_id) @@ -96,22 +116,35 @@ async def async_step_user( else: self._abort_if_unique_id_configured() + # Store user input for later use + self.data.update(user_input) + + # North America and Rest of World require captcha token + if ( + self.data.get(CONF_REGION) in CONF_CAPTCHA_REGIONS + and CONF_CAPTCHA_TOKEN not in self.data + ): + return await self.async_step_captcha() + info = None try: - info = await validate_input(self.hass, user_input) - entry_data = { - **user_input, - CONF_REFRESH_TOKEN: info.get(CONF_REFRESH_TOKEN), - CONF_GCID: info.get(CONF_GCID), - } + info = await validate_input(self.hass, self.data) except MissingCaptcha: errors["base"] = "missing_captcha" except CannotConnect: errors["base"] = "cannot_connect" except InvalidAuth: errors["base"] = "invalid_auth" + finally: + self.data.pop(CONF_CAPTCHA_TOKEN, None) if info: + entry_data = { + **self.data, + CONF_REFRESH_TOKEN: info.get(CONF_REFRESH_TOKEN), + CONF_GCID: info.get(CONF_GCID), + } + if self.source == SOURCE_REAUTH: return self.async_update_reload_and_abort( self._get_reauth_entry(), data=entry_data @@ -128,7 +161,7 @@ async def async_step_user( schema = self.add_suggested_values_to_schema( DATA_SCHEMA, - self._existing_entry_data, + self._existing_entry_data or self.data, ) return self.async_show_form(step_id="user", data_schema=schema, errors=errors) @@ -147,6 +180,22 @@ async def async_step_reconfigure( self._existing_entry_data = self._get_reconfigure_entry().data return await self.async_step_user() + async def async_step_captcha( + self, user_input: dict[str, Any] | None = None + ) -> ConfigFlowResult: + """Show captcha form.""" + if user_input and user_input.get(CONF_CAPTCHA_TOKEN): + self.data[CONF_CAPTCHA_TOKEN] = user_input[CONF_CAPTCHA_TOKEN].strip() + return await self.async_step_user(self.data) + + return self.async_show_form( + step_id="captcha", + data_schema=CAPTCHA_SCHEMA, + description_placeholders={ + "captcha_url": CONF_CAPTCHA_URL.format(region=self.data[CONF_REGION]) + }, + ) + @staticmethod @callback def async_get_options_flow( diff --git a/homeassistant/components/bmw_connected_drive/const.py b/homeassistant/components/bmw_connected_drive/const.py index 98d4acbfc91d8..750289e9d0a0d 100644 --- a/homeassistant/components/bmw_connected_drive/const.py +++ b/homeassistant/components/bmw_connected_drive/const.py @@ -8,10 +8,15 @@ ATTR_VIN = "vin" CONF_ALLOWED_REGIONS = ["china", "north_america", "rest_of_world"] +CONF_CAPTCHA_REGIONS = ["north_america", "rest_of_world"] CONF_READ_ONLY = "read_only" CONF_ACCOUNT = "account" CONF_REFRESH_TOKEN = "refresh_token" CONF_GCID = "gcid" +CONF_CAPTCHA_TOKEN = "captcha_token" +CONF_CAPTCHA_URL = ( + "https://bimmer-connected.readthedocs.io/en/stable/captcha/{region}.html" +) DATA_HASS_CONFIG = "hass_config" diff --git a/homeassistant/components/bmw_connected_drive/coordinator.py b/homeassistant/components/bmw_connected_drive/coordinator.py index d38b7ffacc2a7..4f560d16f9cd1 100644 --- a/homeassistant/components/bmw_connected_drive/coordinator.py +++ b/homeassistant/components/bmw_connected_drive/coordinator.py @@ -84,11 +84,6 @@ async def _async_update_data(self) -> None: if self.account.refresh_token != old_refresh_token: self._update_config_entry_refresh_token(self.account.refresh_token) - _LOGGER.debug( - "bimmer_connected: refresh token %s > %s", - old_refresh_token, - self.account.refresh_token, - ) def _update_config_entry_refresh_token(self, refresh_token: str | None) -> None: """Update or delete the refresh_token in the Config Entry.""" diff --git a/homeassistant/components/bmw_connected_drive/strings.json b/homeassistant/components/bmw_connected_drive/strings.json index 0e7a4a32ef45e..8078971acd170 100644 --- a/homeassistant/components/bmw_connected_drive/strings.json +++ b/homeassistant/components/bmw_connected_drive/strings.json @@ -7,6 +7,16 @@ "password": "[%key:common::config_flow::data::password%]", "region": "ConnectedDrive Region" } + }, + "captcha": { + "title": "Are you a robot?", + "description": "A captcha is required for BMW login. Visit the external website to complete the challenge and submit the form. Copy the resulting token into the field below.\n\n{captcha_url}\n\nNo data will be exposed outside of your Home Assistant instance.", + "data": { + "captcha_token": "Captcha token" + }, + "data_description": { + "captcha_token": "One-time token retrieved from the captcha challenge." + } } }, "error": { diff --git a/tests/components/bmw_connected_drive/__init__.py b/tests/components/bmw_connected_drive/__init__.py index 4d280a1d0e5c8..f490b85474915 100644 --- a/tests/components/bmw_connected_drive/__init__.py +++ b/tests/components/bmw_connected_drive/__init__.py @@ -9,6 +9,7 @@ from homeassistant import config_entries from homeassistant.components.bmw_connected_drive.const import ( + CONF_CAPTCHA_TOKEN, CONF_GCID, CONF_READ_ONLY, CONF_REFRESH_TOKEN, @@ -24,8 +25,12 @@ CONF_PASSWORD: "p4ssw0rd", CONF_REGION: "rest_of_world", } -FIXTURE_REFRESH_TOKEN = "SOME_REFRESH_TOKEN" -FIXTURE_GCID = "SOME_GCID" +FIXTURE_CAPTCHA_INPUT = { + CONF_CAPTCHA_TOKEN: "captcha_token", +} +FIXTURE_USER_INPUT_W_CAPTCHA = FIXTURE_USER_INPUT | FIXTURE_CAPTCHA_INPUT +FIXTURE_REFRESH_TOKEN = "another_token_string" +FIXTURE_GCID = "DUMMY" FIXTURE_CONFIG_ENTRY = { "entry_id": "1", diff --git a/tests/components/bmw_connected_drive/snapshots/test_diagnostics.ambr b/tests/components/bmw_connected_drive/snapshots/test_diagnostics.ambr index 81ef122006976..b87da22a332ee 100644 --- a/tests/components/bmw_connected_drive/snapshots/test_diagnostics.ambr +++ b/tests/components/bmw_connected_drive/snapshots/test_diagnostics.ambr @@ -4833,7 +4833,7 @@ }), ]), 'info': dict({ - 'gcid': 'SOME_GCID', + 'gcid': 'DUMMY', 'password': '**REDACTED**', 'refresh_token': '**REDACTED**', 'region': 'rest_of_world', @@ -7202,7 +7202,7 @@ }), ]), 'info': dict({ - 'gcid': 'SOME_GCID', + 'gcid': 'DUMMY', 'password': '**REDACTED**', 'refresh_token': '**REDACTED**', 'region': 'rest_of_world', @@ -8925,7 +8925,7 @@ }), ]), 'info': dict({ - 'gcid': 'SOME_GCID', + 'gcid': 'DUMMY', 'password': '**REDACTED**', 'refresh_token': '**REDACTED**', 'region': 'rest_of_world', diff --git a/tests/components/bmw_connected_drive/test_config_flow.py b/tests/components/bmw_connected_drive/test_config_flow.py index f57f1a304ac01..8fa9d9be22b64 100644 --- a/tests/components/bmw_connected_drive/test_config_flow.py +++ b/tests/components/bmw_connected_drive/test_config_flow.py @@ -4,17 +4,14 @@ from unittest.mock import patch from bimmer_connected.api.authentication import MyBMWAuthentication -from bimmer_connected.models import ( - MyBMWAPIError, - MyBMWAuthError, - MyBMWCaptchaMissingError, -) +from bimmer_connected.models import MyBMWAPIError, MyBMWAuthError from httpx import RequestError import pytest from homeassistant import config_entries from homeassistant.components.bmw_connected_drive.config_flow import DOMAIN from homeassistant.components.bmw_connected_drive.const import ( + CONF_CAPTCHA_TOKEN, CONF_READ_ONLY, CONF_REFRESH_TOKEN, ) @@ -23,10 +20,12 @@ from homeassistant.data_entry_flow import FlowResultType from . import ( + FIXTURE_CAPTCHA_INPUT, FIXTURE_CONFIG_ENTRY, FIXTURE_GCID, FIXTURE_REFRESH_TOKEN, FIXTURE_USER_INPUT, + FIXTURE_USER_INPUT_W_CAPTCHA, ) from tests.common import MockConfigEntry @@ -61,7 +60,7 @@ async def test_authentication_error(hass: HomeAssistant) -> None: result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_USER}, - data=FIXTURE_USER_INPUT, + data=deepcopy(FIXTURE_USER_INPUT_W_CAPTCHA), ) assert result["type"] is FlowResultType.FORM @@ -79,7 +78,7 @@ async def test_connection_error(hass: HomeAssistant) -> None: result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_USER}, - data=FIXTURE_USER_INPUT, + data=deepcopy(FIXTURE_USER_INPUT_W_CAPTCHA), ) assert result["type"] is FlowResultType.FORM @@ -97,7 +96,7 @@ async def test_api_error(hass: HomeAssistant) -> None: result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_USER}, - data=deepcopy(FIXTURE_USER_INPUT), + data=deepcopy(FIXTURE_USER_INPUT_W_CAPTCHA), ) assert result["type"] is FlowResultType.FORM @@ -105,6 +104,28 @@ async def test_api_error(hass: HomeAssistant) -> None: assert result["errors"] == {"base": "cannot_connect"} +@pytest.mark.usefixtures("bmw_fixture") +async def test_captcha_flow_missing_error(hass: HomeAssistant) -> None: + """Test the external flow with captcha failing once and succeeding the second time.""" + + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_USER}, + data=deepcopy(FIXTURE_USER_INPUT), + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "captcha" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], {CONF_CAPTCHA_TOKEN: " "} + ) + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "user" + assert result["errors"] == {"base": "missing_captcha"} + + async def test_full_user_flow_implementation(hass: HomeAssistant) -> None: """Test registering an integration and finishing flow works.""" with ( @@ -118,14 +139,22 @@ async def test_full_user_flow_implementation(hass: HomeAssistant) -> None: return_value=True, ) as mock_setup_entry, ): - result2 = await hass.config_entries.flow.async_init( + result = await hass.config_entries.flow.async_init( DOMAIN, context={"source": config_entries.SOURCE_USER}, data=deepcopy(FIXTURE_USER_INPUT), ) - assert result2["type"] is FlowResultType.CREATE_ENTRY - assert result2["title"] == FIXTURE_COMPLETE_ENTRY[CONF_USERNAME] - assert result2["data"] == FIXTURE_COMPLETE_ENTRY + + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "captcha" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], FIXTURE_CAPTCHA_INPUT + ) + + assert result["type"] is FlowResultType.CREATE_ENTRY + assert result["title"] == FIXTURE_COMPLETE_ENTRY[CONF_USERNAME] + assert result["data"] == FIXTURE_COMPLETE_ENTRY assert len(mock_setup_entry.mock_calls) == 1 @@ -206,13 +235,20 @@ async def test_reauth(hass: HomeAssistant) -> None: assert suggested_values[CONF_PASSWORD] == wrong_password assert suggested_values[CONF_REGION] == FIXTURE_USER_INPUT[CONF_REGION] - result2 = await hass.config_entries.flow.async_configure( - result["flow_id"], FIXTURE_USER_INPUT + result = await hass.config_entries.flow.async_configure( + result["flow_id"], deepcopy(FIXTURE_USER_INPUT) ) await hass.async_block_till_done() - assert result2["type"] is FlowResultType.ABORT - assert result2["reason"] == "reauth_successful" + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "captcha" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], FIXTURE_CAPTCHA_INPUT + ) + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "reauth_successful" assert config_entry.data == FIXTURE_COMPLETE_ENTRY assert len(mock_setup_entry.mock_calls) == 2 @@ -243,13 +279,13 @@ async def test_reauth_unique_id_abort(hass: HomeAssistant) -> None: assert result["step_id"] == "user" assert result["errors"] == {} - result2 = await hass.config_entries.flow.async_configure( + result = await hass.config_entries.flow.async_configure( result["flow_id"], {**FIXTURE_USER_INPUT, CONF_REGION: "north_america"} ) await hass.async_block_till_done() - assert result2["type"] is FlowResultType.ABORT - assert result2["reason"] == "account_mismatch" + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "account_mismatch" assert config_entry.data == config_entry_with_wrong_password["data"] @@ -279,13 +315,20 @@ async def test_reconfigure(hass: HomeAssistant) -> None: assert suggested_values[CONF_PASSWORD] == FIXTURE_USER_INPUT[CONF_PASSWORD] assert suggested_values[CONF_REGION] == FIXTURE_USER_INPUT[CONF_REGION] - result2 = await hass.config_entries.flow.async_configure( + result = await hass.config_entries.flow.async_configure( result["flow_id"], FIXTURE_USER_INPUT ) await hass.async_block_till_done() - assert result2["type"] is FlowResultType.ABORT - assert result2["reason"] == "reconfigure_successful" + assert result["type"] is FlowResultType.FORM + assert result["step_id"] == "captcha" + + result = await hass.config_entries.flow.async_configure( + result["flow_id"], FIXTURE_CAPTCHA_INPUT + ) + + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "reconfigure_successful" assert config_entry.data == FIXTURE_COMPLETE_ENTRY @@ -307,40 +350,12 @@ async def test_reconfigure_unique_id_abort(hass: HomeAssistant) -> None: assert result["step_id"] == "user" assert result["errors"] == {} - result2 = await hass.config_entries.flow.async_configure( + result = await hass.config_entries.flow.async_configure( result["flow_id"], {**FIXTURE_USER_INPUT, CONF_USERNAME: "somebody@email.com"}, ) await hass.async_block_till_done() - assert result2["type"] is FlowResultType.ABORT - assert result2["reason"] == "account_mismatch" + assert result["type"] is FlowResultType.ABORT + assert result["reason"] == "account_mismatch" assert config_entry.data == FIXTURE_COMPLETE_ENTRY - - -@pytest.mark.usefixtures("bmw_fixture") -async def test_captcha_flow_not_set(hass: HomeAssistant) -> None: - """Test the external flow with captcha failing once and succeeding the second time.""" - - TEST_REGION = "north_america" - - # Start flow and open form - # Start flow and open form - result = await hass.config_entries.flow.async_init( - DOMAIN, context={"source": config_entries.SOURCE_USER} - ) - assert result["type"] is FlowResultType.FORM - assert result["step_id"] == "user" - - # Add login data - with patch( - "bimmer_connected.api.authentication.MyBMWAuthentication._login_row_na", - side_effect=MyBMWCaptchaMissingError( - "Missing hCaptcha token for North America login" - ), - ): - result = await hass.config_entries.flow.async_configure( - result["flow_id"], - user_input={**FIXTURE_USER_INPUT, CONF_REGION: TEST_REGION}, - ) - assert result["errors"]["base"] == "missing_captcha"