Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer Region in UserAgent-header #3099

Merged
merged 6 commits into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions moto/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import types
from io import BytesIO
from collections import defaultdict
from botocore.config import Config
from botocore.handlers import BUILTIN_HANDLERS
from botocore.awsrequest import AWSResponse
from six.moves.urllib.parse import urlparse
Expand Down Expand Up @@ -416,6 +417,13 @@ def enable_patching(self):
import mock

def fake_boto3_client(*args, **kwargs):
region = self._get_region(*args, **kwargs)
if region:
if "config" in kwargs:
kwargs["config"].__dict__["user_agent_extra"] += " region/" + region
else:
config = Config(user_agent_extra="region/" + region)
kwargs["config"] = config
if "endpoint_url" not in kwargs:
kwargs["endpoint_url"] = "http://localhost:5000"
return real_boto3_client(*args, **kwargs)
Expand Down Expand Up @@ -463,6 +471,14 @@ def _convert_to_bytes(mixed_buffer):
if six.PY2:
self._httplib_patcher.start()

def _get_region(self, *args, **kwargs):
if "region_name" in kwargs:
return kwargs["region_name"]
if type(args) == tuple and len(args) == 2:
service, region = args
return region
return None

def disable_patching(self):
if self._client_patcher:
self._client_patcher.stop()
Expand Down
14 changes: 11 additions & 3 deletions moto/core/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ class BaseResponse(_TemplateEnvironmentMixin, ActionAuthenticatorMixin):
default_region = "us-east-1"
# to extract region, use [^.]
region_regex = re.compile(r"\.(?P<region>[a-z]{2}-[a-z]+-\d{1})\.amazonaws\.com")
region_from_useragent_regex = re.compile(
r"region/(?P<region>[a-z]{2}-[a-z]+-\d{1})"
)
param_list_regex = re.compile(r"(.*)\.(\d+)\.")
access_key_regex = re.compile(
r"AWS.*(?P<access_key>(?<![A-Z0-9])[A-Z0-9]{20}(?![A-Z0-9]))[:/]"
Expand Down Expand Up @@ -272,9 +275,14 @@ def setup_class(self, request, full_url, headers):
self.response_headers = {"server": "amazon.com"}

def get_region_from_url(self, request, full_url):
match = self.region_regex.search(full_url)
if match:
region = match.group(1)
url_match = self.region_regex.search(full_url)
user_agent_match = self.region_from_useragent_regex.search(
request.headers.get("User-Agent", "")
)
if url_match:
region = url_match.group(1)
elif user_agent_match:
region = user_agent_match.group(1)
elif (
"Authorization" in request.headers
and "AWS4" in request.headers["Authorization"]
Expand Down
32 changes: 32 additions & 0 deletions tests/test_cognitoidp/test_cognitoidp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,38 @@ def test_change_password():
result["AuthenticationResult"].should_not.be.none


@mock_cognitoidp
def test_change_password__using_custom_user_agent_header():
# https://github.com/spulec/moto/issues/3098
# As the admin_initiate_auth-method is unauthenticated, we use the user-agent header to pass in the region
# This test verifies this works, even if we pass in our own user-agent header
from botocore.config import Config

my_config = Config(user_agent_extra="more/info", signature_version="v4")
conn = boto3.client("cognito-idp", "us-west-2", config=my_config)

outputs = authentication_flow(conn)

# Take this opportunity to test change_password, which requires an access token.
newer_password = str(uuid.uuid4())
conn.change_password(
AccessToken=outputs["access_token"],
PreviousPassword=outputs["password"],
ProposedPassword=newer_password,
)

# Log in again, which should succeed without a challenge because the user is no
# longer in the force-new-password state.
result = conn.admin_initiate_auth(
UserPoolId=outputs["user_pool_id"],
ClientId=outputs["client_id"],
AuthFlow="ADMIN_NO_SRP_AUTH",
AuthParameters={"USERNAME": outputs["username"], "PASSWORD": newer_password},
)

result["AuthenticationResult"].should_not.be.none


@mock_cognitoidp
def test_forgot_password():
conn = boto3.client("cognito-idp", "us-west-2")
Expand Down