Skip to content

Commit

Permalink
fix: Validate url domain for aws metadata urls (#1174)
Browse files Browse the repository at this point in the history
* fix: Validate url domain for aws metadata urls

* optimize tests
  • Loading branch information
sai-sunder-s authored Nov 7, 2022
1 parent b9aa92a commit f9d7d77
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 0 deletions.
19 changes: 19 additions & 0 deletions google/auth/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from six.moves import http_client
from six.moves import urllib
from six.moves.urllib.parse import urljoin
from six.moves.urllib.parse import urlparse

from google.auth import _helpers
from google.auth import environment_vars
Expand Down Expand Up @@ -397,6 +398,8 @@ def __init__(
self._request_signer = None
self._target_resource = audience

self.validate_metadata_server_urls()

# Get the environment ID. Currently, only one version supported (v1).
matches = re.match(r"^(aws)([\d]+)$", self._environment_id)
if matches:
Expand All @@ -413,6 +416,22 @@ def __init__(
)
)

def validate_metadata_server_urls(self):
self.validate_metadata_server_url_if_any(self._region_url, "region_url")
self.validate_metadata_server_url_if_any(self._security_credentials_url, "url")
self.validate_metadata_server_url_if_any(
self._imdsv2_session_token_url, "imdsv2_session_token_url"
)

@staticmethod
def validate_metadata_server_url_if_any(url_string, name_of_data):
if url_string:
url = urlparse(url_string)
if url.hostname != "169.254.169.254" and url.hostname != "fd00:ec2::254":
raise ValueError(
"Invalid hostname '{}' for '{}'".format(url.hostname, name_of_data)
)

def retrieve_subject_token(self, request):
"""Retrieves the subject token using the credential_source object.
The subject token is a serialized `AWS GetCallerIdentity signed request`_.
Expand Down
103 changes: 103 additions & 0 deletions tests/test_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@
REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
IMDSV2_SESSION_TOKEN_URL = "http://169.254.169.254/latest/api/token"
SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
REGION_URL_IPV6 = "http://[fd00:ec2::254]/latest/meta-data/placement/availability-zone"
IMDSV2_SESSION_TOKEN_URL_IPV6 = "http://[fd00:ec2::254]/latest/api/token"
SECURITY_CREDS_URL_IPV6 = (
"http://[fd00:ec2::254]/latest/meta-data/iam/security-credentials"
)
CRED_VERIFICATION_URL = (
"https://sts.{region}.amazonaws.com?Action=GetCallerIdentity&Version=2011-06-15"
)
Expand Down Expand Up @@ -676,6 +681,13 @@ class TestCredentials(object):
"url": SECURITY_CREDS_URL,
"regional_cred_verification_url": CRED_VERIFICATION_URL,
}
CREDENTIAL_SOURCE_IPV6 = {
"environment_id": "aws1",
"region_url": REGION_URL_IPV6,
"url": SECURITY_CREDS_URL_IPV6,
"regional_cred_verification_url": CRED_VERIFICATION_URL,
"imdsv2_session_token_url": IMDSV2_SESSION_TOKEN_URL_IPV6,
}
SUCCESS_RESPONSE = {
"access_token": "ACCESS_TOKEN",
"issued_token_type": "urn:ietf:params:oauth:token-type:access_token",
Expand Down Expand Up @@ -1311,6 +1323,97 @@ def test_retrieve_subject_token_success_temp_creds_no_environment_vars_idmsv2(
},
)

def test_validate_metadata_server_url_if_any(self):
aws.Credentials.validate_metadata_server_url_if_any(
"http://[fd00:ec2::254]/latest/meta-data/placement/availability-zone", "url"
)
aws.Credentials.validate_metadata_server_url_if_any(
"http://169.254.169.254/latest/meta-data/placement/availability-zone", "url"
)

with pytest.raises(ValueError) as excinfo:
aws.Credentials.validate_metadata_server_url_if_any(
"http://fd00:ec2::254/latest/meta-data/placement/availability-zone",
"url",
)
assert excinfo.match("Invalid hostname 'fd00' for 'url'")

with pytest.raises(ValueError) as excinfo:
aws.Credentials.validate_metadata_server_url_if_any(
"http://abc.com/latest/meta-data/placement/availability-zone", "url"
)
assert excinfo.match("Invalid hostname 'abc.com' for 'url'")

def test_retrieve_subject_token_invalid_hosts(self):
keys = ["url", "region_url", "imdsv2_session_token_url"]
for key in keys:
credential_source = self.CREDENTIAL_SOURCE.copy()
credential_source[
key
] = "http://abc.com/latest/meta-data/iam/security-credentials"

with pytest.raises(ValueError) as excinfo:
self.make_credentials(credential_source=credential_source)
assert excinfo.match("Invalid hostname 'abc.com' for '{}'".format(key))

@mock.patch("google.auth._helpers.utcnow")
def test_retrieve_subject_token_success_ipv6(self, utcnow):
utcnow.return_value = datetime.datetime.strptime(
self.AWS_SIGNATURE_TIME, "%Y-%m-%dT%H:%M:%SZ"
)
request = self.make_mock_request(
region_status=http_client.OK,
region_name=self.AWS_REGION,
role_status=http_client.OK,
role_name=self.AWS_ROLE,
security_credentials_status=http_client.OK,
security_credentials_data=self.AWS_SECURITY_CREDENTIALS_RESPONSE,
imdsv2_session_token_status=http_client.OK,
imdsv2_session_token_data=self.AWS_IMDSV2_SESSION_TOKEN,
)
credential_source_token_url = self.CREDENTIAL_SOURCE_IPV6.copy()
credentials = self.make_credentials(
credential_source=credential_source_token_url
)

subject_token = credentials.retrieve_subject_token(request)

assert subject_token == self.make_serialized_aws_signed_request(
{
"access_key_id": ACCESS_KEY_ID,
"secret_access_key": SECRET_ACCESS_KEY,
"security_token": TOKEN,
}
)
# Assert session token request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[0][1],
IMDSV2_SESSION_TOKEN_URL_IPV6,
{"X-aws-ec2-metadata-token-ttl-seconds": "300"},
"PUT",
)
# Assert region request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[1][1],
REGION_URL_IPV6,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert role request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[2][1],
SECURITY_CREDS_URL_IPV6,
{"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN},
)
# Assert security credentials request.
self.assert_aws_metadata_request_kwargs(
request.call_args_list[3][1],
"{}/{}".format(SECURITY_CREDS_URL_IPV6, self.AWS_ROLE),
{
"Content-Type": "application/json",
"X-aws-ec2-metadata-token": self.AWS_IMDSV2_SESSION_TOKEN,
},
)

@mock.patch("google.auth._helpers.utcnow")
def test_retrieve_subject_token_session_error_idmsv2(self, utcnow):
utcnow.return_value = datetime.datetime.strptime(
Expand Down

0 comments on commit f9d7d77

Please sign in to comment.