Skip to content

Commit

Permalink
format validations
Browse files Browse the repository at this point in the history
  • Loading branch information
Chuan Ren committed Apr 6, 2022
1 parent d276d52 commit c9d304d
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 5 deletions.
14 changes: 11 additions & 3 deletions google/auth/pluggable.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,24 +269,32 @@ def from_file(cls, filename, **kwargs):
return cls.from_info(data, **kwargs)

def _parse_subject_token(self, response):
if "version" in response and response["version"] > EXECUTABLE_SUPPORTED_MAX_VERSION:
if not "version" in response:
raise ValueError("The executable response is missing the version field.")
if response["version"] > EXECUTABLE_SUPPORTED_MAX_VERSION:
raise exceptions.RefreshError(
"Executable returned unsupported version {}.".format(
response["version"]
)
)
if not "success" in response:
raise ValueError("The executable response is missing the success field.")
if not response["success"]:
if not response["code"] or not response["message"]:
raise ValueError("Code and message are required in the response.")
if "code" not in response or "message" not in response:
raise ValueError("Error code and message fields are required in the response.")
raise exceptions.RefreshError(
"Executable returned unsuccessful response: code: {}, message: {}.".format(
response["code"], response["message"]
)
)
if not "expiration_time" in response:
raise ValueError("The executable response is missing the expiration_time field.")
if response["expiration_time"] < time.time():
raise exceptions.RefreshError(
"The token returned by the executable is expired."
)
if not "token_type" in response:
raise ValueError("The executable response is missing the token_type field.")
if (
response["token_type"] == "urn:ietf:params:oauth:token-type:jwt"
or response["token_type"] == "urn:ietf:params:oauth:token-type:id_token"
Expand Down
109 changes: 107 additions & 2 deletions tests/test_pluggable.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ def test_retrieve_subject_token_oidc_id_token(self, fp):
fp.register(
self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(),
stdout=json.dumps(self.EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_ID_TOKEN),

)

credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE)
Expand Down Expand Up @@ -520,7 +521,7 @@ def test_retrieve_subject_token_invalid_version(self, fp):
subject_token = credentials.retrieve_subject_token(None)

assert excinfo.match(r"Executable returned unsupported version.")

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_expired_token(self, fp):
EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE_EXPIRED = {
Expand Down Expand Up @@ -577,4 +578,108 @@ def test_retrieve_subject_token_unsupported_token_type(self, fp):
with pytest.raises(exceptions.RefreshError) as excinfo:
subject_token = credentials.retrieve_subject_token(None)

assert excinfo.match(r"Executable returned unsupported token type.")
assert excinfo.match(r"Executable returned unsupported token type.")


@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_missing_version(self, fp):
EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = {
"success": True,
"token_type": "urn:ietf:params:oauth:token-type:id_token",
"id_token": self.EXECUTABLE_OIDC_TOKEN,
"expiration_time": 9999999999,
}

fp.register(
self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(),
stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE),
)

credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE)

with pytest.raises(ValueError) as excinfo:
subject_token = credentials.retrieve_subject_token(None)

assert excinfo.match(r"The executable response is missing the version field.")

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_missing_success(self, fp):
EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = {
"version": 1,
"token_type": "urn:ietf:params:oauth:token-type:id_token",
"id_token": self.EXECUTABLE_OIDC_TOKEN,
"expiration_time": 9999999999,
}

fp.register(
self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(),
stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE),
)

credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE)

with pytest.raises(ValueError) as excinfo:
subject_token = credentials.retrieve_subject_token(None)

assert excinfo.match(r"The executable response is missing the success field.")

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_missing_error_code_message(self, fp):
EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = {
"version": 1,
"success": False,
}

fp.register(
self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(),
stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE),
)

credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE)

with pytest.raises(ValueError) as excinfo:
subject_token = credentials.retrieve_subject_token(None)

assert excinfo.match(r"Error code and message fields are required in the response.")

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_missing_expiration_time(self, fp):
EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = {
"version": 1,
"success": True,
"token_type": "urn:ietf:params:oauth:token-type:id_token",
"id_token": self.EXECUTABLE_OIDC_TOKEN,
}

fp.register(
self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(),
stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE),
)

credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE)

with pytest.raises(ValueError) as excinfo:
subject_token = credentials.retrieve_subject_token(None)

assert excinfo.match(r"The executable response is missing the expiration_time field.")

@mock.patch.dict(os.environ, {"GOOGLE_EXTERNAL_ACCOUNT_ALLOW_EXECUTABLES": "1"})
def test_retrieve_subject_token_missing_token_type(self, fp):
EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE = {
"version": 1,
"success": True,
"id_token": self.EXECUTABLE_OIDC_TOKEN,
"expiration_time": 9999999999,
}

fp.register(
self.CREDENTIAL_SOURCE_EXECUTABLE_COMMAND.split(),
stdout=json.dumps(EXECUTABLE_SUCCESSFUL_OIDC_RESPONSE),
)

credentials = self.make_pluggable(credential_source=self.CREDENTIAL_SOURCE)

with pytest.raises(ValueError) as excinfo:
subject_token = credentials.retrieve_subject_token(None)

assert excinfo.match(r"The executable response is missing the token_type field.")

0 comments on commit c9d304d

Please sign in to comment.