Skip to content

Commit

Permalink
feat: update cognito user pool events and add new trigger events
Browse files Browse the repository at this point in the history
  • Loading branch information
knightmre authored and mengs3 committed Jun 12, 2024
1 parent 85d82d7 commit 036b3e0
Show file tree
Hide file tree
Showing 6 changed files with 481 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,18 @@ def force_alias_creation(self, value: bool):
"""
self["response"]["forceAliasCreation"] = value

@property
def enable_sms_mfa(self) -> Optional[bool]:
return self["response"].get("enableSMSMFA")

@enable_sms_mfa.setter
def enable_sms_mfa(self, value: bool):
"""Set this parameter to "true" to require that your migrated user complete SMS text message multi-factor
authentication (MFA) to sign in. Your user pool must have MFA enabled. Your user's attributes
in the request parameters must include a phone number, or else the migration of that user will fail.
"""
self["response"]["enableSMSMFA"] = value


class UserMigrationTriggerEvent(BaseTriggerEvent):
"""Migrate User Lambda Trigger
Expand Down Expand Up @@ -270,6 +282,11 @@ def code_parameter(self) -> str:
"""A string for you to use as the placeholder for the verification code in the custom message."""
return self["request"]["codeParameter"]

@property
def link_parameter(self) -> str:
"""A string for you to use as a placeholder for the verification link in the custom message."""
return self["request"]["linkParameter"]

@property
def username_parameter(self) -> str:
"""The username parameter. It is a required request parameter for the admin create user flow."""
Expand Down Expand Up @@ -466,6 +483,16 @@ def client_metadata(self) -> Optional[Dict[str, str]]:
return self["request"].get("clientMetadata")


class PreTokenGenerationTriggerV2EventRequest(PreTokenGenerationTriggerEventRequest):
@property
def scopes(self) -> List[str]:
"""Your user's OAuth 2.0 scopes. The scopes that are present in an access token are
the user pool standard and custom scopes that your user requested,
and that you authorized your app client to issue.
"""
return self["request"].get("scopes")


class ClaimsOverrideDetails(DictWrapper):
@property
def claims_to_add_or_override(self) -> Optional[Dict[str, str]]:
Expand Down Expand Up @@ -520,6 +547,123 @@ def set_group_configuration_preferred_role(self, value: str):
self["groupOverrideDetails"]["preferredRole"] = value


class TokenClaimsAndScopeOverrideDetails(DictWrapper):
@property
def claims_to_add_or_override(self) -> Dict[str, str]:
return self.get("claimsToAddOrOverride") or {}

@claims_to_add_or_override.setter
def claims_to_add_or_override(self, value: Dict[str, str]):
"""A map of one or more key-value pairs of claims to add or override.
For group related claims, use groupOverrideDetails instead."""
self._data["claimsToAddOrOverride"] = value

@property
def claims_to_suppress(self) -> List[str]:
return self.get("claimsToSuppress") or []

@claims_to_suppress.setter
def claims_to_suppress(self, value: List[str]):
"""A list that contains claims to be suppressed from the identity token."""
self._data["claimsToSuppress"] = value

@property
def scopes_to_add(self) -> List[str]:
return self.get("scopesToAdd") or []

@scopes_to_add.setter
def scopes_to_add(self, value: List[str]):
self._data["scopesToAdd"] = value

@property
def scopes_to_suppress(self) -> List[str]:
return self.get("scopesToSuppress") or []

@scopes_to_suppress.setter
def scopes_to_suppress(self, value: List[str]):
self._data["scopesToSuppress"] = value


class ClaimsAndScopeOverrideDetails(DictWrapper):

@property
def id_token_generation(self) -> Optional[TokenClaimsAndScopeOverrideDetails]:
id_token_generation_details = self._data.get("idTokenGeneration")
return (
None
if id_token_generation_details is None
else TokenClaimsAndScopeOverrideDetails(id_token_generation_details)
)

@id_token_generation.setter
def id_token_generation(self, value: Dict[str, Any]):
"""The output object containing the current id token's claims and scope configuration.
It includes claimsToAddOrOverride, claimsToSuppress, scopesToAdd and scopesToSupprress.
The tokenClaimsAndScopeOverrideDetails object is replaced with the one you provide.
If you provide an empty or null object in the response, then the groups are suppressed.
To leave the existing group configuration as is, copy the value of the token's object
to the tokenClaimsAndScopeOverrideDetails object in the response, and pass it back to the service.
"""
self._data["idTokenGeneration"] = value

@property
def access_token_generation(self) -> Optional[TokenClaimsAndScopeOverrideDetails]:
access_token_generation_details = self._data.get("accessTokenGeneration")
return (
None
if access_token_generation_details is None
else TokenClaimsAndScopeOverrideDetails(access_token_generation_details)
)

@access_token_generation.setter
def access_token_generation(self, value: Dict[str, Any]):
"""The output object containing the current access token's claims and scope configuration.
It includes claimsToAddOrOverride, claimsToSuppress, scopesToAdd and scopesToSupprress.
The tokenClaimsAndScopeOverrideDetails object is replaced with the one you provide.
If you provide an empty or null object in the response, then the groups are suppressed.
To leave the existing group configuration as is, copy the value of the token's object to
the tokenClaimsAndScopeOverrideDetails object in the response, and pass it back to the service.
"""
self._data["accessTokenGeneration"] = value

@property
def group_configuration(self) -> Optional[GroupOverrideDetails]:
group_override_details = self.get("groupOverrideDetails")
return None if group_override_details is None else GroupOverrideDetails(group_override_details)

@group_configuration.setter
def group_configuration(self, value: Dict[str, Any]):
"""The output object containing the current group configuration.
It includes groupsToOverride, iamRolesToOverride, and preferredRole.
The groupOverrideDetails object is replaced with the one you provide. If you provide an empty or null
object in the response, then the groups are suppressed. To leave the existing group configuration
as is, copy the value of the request's groupConfiguration object to the groupOverrideDetails object
in the response, and pass it back to the service.
"""
self._data["groupOverrideDetails"] = value

def set_group_configuration_groups_to_override(self, value: List[str]):
"""A list of the group names that are associated with the user that the identity token is issued for."""
self._data.setdefault("groupOverrideDetails", {})
self["groupOverrideDetails"]["groupsToOverride"] = value

def set_group_configuration_iam_roles_to_override(self, value: List[str]):
"""A list of the current IAM roles associated with these groups."""
self._data.setdefault("groupOverrideDetails", {})
self["groupOverrideDetails"]["iamRolesToOverride"] = value

def set_group_configuration_preferred_role(self, value: str):
"""A string indicating the preferred IAM role."""
self._data.setdefault("groupOverrideDetails", {})
self["groupOverrideDetails"]["preferredRole"] = value


class PreTokenGenerationTriggerEventResponse(DictWrapper):
@property
def claims_override_details(self) -> ClaimsOverrideDetails:
Expand All @@ -529,6 +673,15 @@ def claims_override_details(self) -> ClaimsOverrideDetails:
return ClaimsOverrideDetails(self._data["response"]["claimsOverrideDetails"])


class PreTokenGenerationTriggerV2EventResponse(DictWrapper):
@property
def claims_scope_override_details(self) -> ClaimsAndScopeOverrideDetails:
# Ensure we have a `claimsAndScopeOverrideDetails` element and is not set to None
if self._data["response"].get("claimsAndScopeOverrideDetails") is None:
self._data["response"]["claimsAndScopeOverrideDetails"] = {}
return ClaimsAndScopeOverrideDetails(self._data["response"]["claimsAndScopeOverrideDetails"])


class PreTokenGenerationTriggerEvent(BaseTriggerEvent):
"""Pre Token Generation Lambda Trigger
Expand Down Expand Up @@ -561,6 +714,38 @@ def response(self) -> PreTokenGenerationTriggerEventResponse:
return PreTokenGenerationTriggerEventResponse(self._data)


class PreTokenGenerationV2TriggerEvent(BaseTriggerEvent):
"""Pre Token Generation Lambda Trigger for the V2 Event
Amazon Cognito invokes this trigger before token generation allowing you to customize identity token claims.
Notes:
----
`triggerSource` can be one of the following:
- `TokenGeneration_HostedAuth` Called during authentication from the Amazon Cognito hosted UI sign-in page.
- `TokenGeneration_Authentication` Called after user authentication flows have completed.
- `TokenGeneration_NewPasswordChallenge` Called after the user is created by an admin. This flow is invoked
when the user has to change a temporary password.
- `TokenGeneration_AuthenticateDevice` Called at the end of the authentication of a user device.
- `TokenGeneration_RefreshTokens` Called when a user tries to refresh the identity and access tokens.
Documentation:
--------------
- https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-lambda-pre-token-generation.html
"""

@property
def request(self) -> PreTokenGenerationTriggerV2EventRequest:
"""Pre Token Generation Request V2 Parameters"""
return PreTokenGenerationTriggerV2EventRequest(self._data)

@property
def response(self) -> PreTokenGenerationTriggerV2EventResponse:
"""Pre Token Generation Response V2 Parameters"""
return PreTokenGenerationTriggerV2EventResponse(self._data)


class ChallengeResult(DictWrapper):
@property
def challenge_name(self) -> str:
Expand Down Expand Up @@ -822,3 +1007,77 @@ def request(self) -> VerifyAuthChallengeResponseTriggerEventRequest:
def response(self) -> VerifyAuthChallengeResponseTriggerEventResponse:
"""Verify Auth Challenge Response Parameters"""
return VerifyAuthChallengeResponseTriggerEventResponse(self._data)


class CustomEmailSenderTriggerEventRequest(DictWrapper):
@property
def type(self) -> str:
"""The request version. For a custom email sender event, the value of this string
is always customEmailSenderRequestV1.
"""
return self["request"]["type"]

@property
def code(self) -> str:
"""The encrypted code that your function can decrypt and send to your user."""
return self["request"]["code"]

@property
def user_attributes(self) -> Dict[str, str]:
"""One or more name-value pairs representing user attributes. The attribute names are the keys."""
return self["request"]["userAttributes"]

@property
def client_metadata(self) -> Dict[str, str]:
"""One or more key-value pairs that you can provide as custom input to the
custom email sender Lambda function trigger. To pass this data to your Lambda function,
you can use the ClientMetadata parameter in the AdminRespondToAuthChallenge and
RespondToAuthChallenge API actions. Amazon Cognito doesn't include data from the
ClientMetadata parameter in AdminInitiateAuth and InitiateAuth API operations
in the request that it passes to the post authentication function.
"""
return self["request"].get("clientMetadata") or {}


class CustomEmailSenderTriggerEvent(BaseTriggerEvent):
@property
def request(self) -> CustomEmailSenderTriggerEventRequest:
"""Custom Email Sender Request Parameters"""
return CustomEmailSenderTriggerEventRequest(self._data)


class CustomSMSSenderTriggerEventRequest(DictWrapper):
@property
def type(self) -> str:
"""The request version. For a custom SMS sender event, the value of this string is always
customSMSSenderRequestV1.
"""
return self["request"]["type"]

@property
def code(self) -> str:
"""The encrypted code that your function can decrypt and send to your user."""
return self["request"]["code"]

@property
def user_attributes(self) -> Dict[str, str]:
"""One or more name-value pairs representing user attributes. The attribute names are the keys."""
return self["request"]["userAttributes"]

@property
def client_metadata(self) -> Dict[str, str]:
"""One or more key-value pairs that you can provide as custom input to the
custom SMS sender Lambda function trigger. To pass this data to your Lambda function,
you can use the ClientMetadata parameter in the AdminRespondToAuthChallenge and
RespondToAuthChallenge API actions. Amazon Cognito doesn't include data from the
ClientMetadata parameter in AdminInitiateAuth and InitiateAuth API operations
in the request that it passes to the post authentication function.
"""
return self["request"].get("clientMetadata") or {}


class CustomSMSSenderTriggerEvent(BaseTriggerEvent):
@property
def request(self) -> CustomSMSSenderTriggerEventRequest:
"""Custom SMS Sender Request Parameters"""
return CustomSMSSenderTriggerEventRequest(self._data)
19 changes: 19 additions & 0 deletions tests/events/cognitoCustomEmailSenderEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "1",
"triggerSource": "CustomEmailSender_SignUp",
"region": "region",
"userPoolId": "userPoolId",
"userName": "userName",
"callerContext": {
"awsSdk": "awsSdkVersion",
"clientId": "clientId"
},
"request": {
"userAttributes": {
"phone_number_verified": false,
"email_verified": true
},
"type": "customEmailSenderRequestV1",
"code": "someCode"
}
}
1 change: 1 addition & 0 deletions tests/events/cognitoCustomMessageEvent.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"email_verified": true
},
"codeParameter": "####",
"linkParameter": "{##Click Here##}",
"usernameParameter": "username"
},
"response": {}
Expand Down
19 changes: 19 additions & 0 deletions tests/events/cognitoCustomSMSSenderEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"version": "1",
"triggerSource": "CustomSMSSender_SignUp",
"region": "region",
"userPoolId": "userPoolId",
"userName": "userName",
"callerContext": {
"awsSdk": "awsSdkVersion",
"clientId": "clientId"
},
"request": {
"userAttributes": {
"phone_number_verified": false,
"email_verified": true
},
"type": "customEmailSenderRequestV1",
"code": "someCode"
}
}
28 changes: 28 additions & 0 deletions tests/events/cognitoPreTokenV2GenerationEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"version": "1",
"triggerSource": "TokenGeneration_Authentication",
"region": "us-west-2",
"userPoolId": "us-west-2_example",
"userName": "testqq",
"callerContext": {
"awsSdkVersion": "aws-sdk-unknown-unknown",
"clientId": "clientId"
},
"request": {
"userAttributes": {
"sub": "0b0a57c5-f013-426a-81a1-f8ffbfba21f0",
"email_verified": "true",
"cognito:user_status": "CONFIRMED",
"email": "[email protected]"
},
"groupConfiguration": {
"groupsToOverride": [],
"iamRolesToOverride": [],
"preferredRole": null
},
"scopes": [
"aws.cognito.signin.user.admin"
]
},
"response": {}
}
Loading

0 comments on commit 036b3e0

Please sign in to comment.