Skip to content

Commit

Permalink
Merge remote-tracking branch 'michaelbrewer/docs/router' into docs/ro…
Browse files Browse the repository at this point in the history
…uter

* michaelbrewer/docs/router:
  feat(data-classes): ActiveMQ and RabbitMQ support (aws-powertools#770)
  feat(appsync): add Router to allow large resolver composition (aws-powertools#776)
  chore(deps-dev): bump mkdocs-material from 7.3.3 to 7.3.5 (aws-powertools#781)
  chore(deps-dev): bump flake8-isort from 4.0.0 to 4.1.1 (aws-powertools#785)
  chore(deps): bump urllib3 from 1.26.4 to 1.26.5 (aws-powertools#787)
  chore(deps-dev): bump flake8-eradicate from 1.1.0 to 1.2.0 (aws-powertools#784)
  chore(deps): bump boto3 from 1.18.61 to 1.19.6 (aws-powertools#783)
  chore(deps-dev): bump pytest-asyncio from 0.15.1 to 0.16.0 (aws-powertools#782)
  docs: fix indentation of SAM snippets in install section (aws-powertools#778)
  Fix middleware sample (aws-powertools#772)
  Removed unused import, added typing imports, fixed typo in example. (aws-powertools#774)
  Fix middleware sample (aws-powertools#772)
  Removed unused import, added typing imports, fixed typo in example. (aws-powertools#774)
  Update docs/core/event_handler/api_gateway.md

# Conflicts:
#	docs/core/event_handler/api_gateway.md
  • Loading branch information
heitorlessa committed Oct 29, 2021
2 parents 3632d18 + 94493dc commit dcab235
Show file tree
Hide file tree
Showing 13 changed files with 608 additions and 79 deletions.
75 changes: 48 additions & 27 deletions aws_lambda_powertools/event_handler/appsync.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from abc import ABC
from typing import Any, Callable, Optional, Type, TypeVar

from aws_lambda_powertools.utilities.data_classes import AppSyncResolverEvent
Expand All @@ -9,7 +10,33 @@
AppSyncResolverEventT = TypeVar("AppSyncResolverEventT", bound=AppSyncResolverEvent)


class AppSyncResolver:
class BaseRouter(ABC):
current_event: AppSyncResolverEventT # type: ignore[valid-type]
lambda_context: LambdaContext

def __init__(self):
self._resolvers: dict = {}

def resolver(self, type_name: str = "*", field_name: Optional[str] = None):
"""Registers the resolver for field_name
Parameters
----------
type_name : str
Type name
field_name : str
Field name
"""

def register_resolver(func):
logger.debug(f"Adding resolver `{func.__name__}` for field `{type_name}.{field_name}`")
self._resolvers[f"{type_name}.{field_name}"] = {"func": func}
return func

return register_resolver


class AppSyncResolver(BaseRouter):
"""
AppSync resolver decorator
Expand Down Expand Up @@ -40,29 +67,8 @@ def common_field() -> str:
return str(uuid.uuid4())
"""

current_event: AppSyncResolverEventT # type: ignore[valid-type]
lambda_context: LambdaContext

def __init__(self):
self._resolvers: dict = {}

def resolver(self, type_name: str = "*", field_name: Optional[str] = None):
"""Registers the resolver for field_name
Parameters
----------
type_name : str
Type name
field_name : str
Field name
"""

def register_resolver(func):
logger.debug(f"Adding resolver `{func.__name__}` for field `{type_name}.{field_name}`")
self._resolvers[f"{type_name}.{field_name}"] = {"func": func}
return func

return register_resolver
super().__init__()

def resolve(
self, event: dict, context: LambdaContext, data_model: Type[AppSyncResolverEvent] = AppSyncResolverEvent
Expand Down Expand Up @@ -136,10 +142,10 @@ def lambda_handler(event, context):
ValueError
If we could not find a field resolver
"""
self.current_event = data_model(event)
self.lambda_context = context
resolver = self._get_resolver(self.current_event.type_name, self.current_event.field_name)
return resolver(**self.current_event.arguments)
BaseRouter.current_event = data_model(event)
BaseRouter.lambda_context = context
resolver = self._get_resolver(BaseRouter.current_event.type_name, BaseRouter.current_event.field_name)
return resolver(**BaseRouter.current_event.arguments)

def _get_resolver(self, type_name: str, field_name: str) -> Callable:
"""Get resolver for field_name
Expand Down Expand Up @@ -167,3 +173,18 @@ def __call__(
) -> Any:
"""Implicit lambda handler which internally calls `resolve`"""
return self.resolve(event, context, data_model)

def include_router(self, router: "Router") -> None:
"""Adds all resolvers defined in a router
Parameters
----------
router : Router
A router containing a dict of field resolvers
"""
self._resolvers.update(router._resolvers)


class Router(BaseRouter):
def __init__(self):
super().__init__()
125 changes: 125 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/active_mq_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import base64
import json
from typing import Any, Iterator, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class ActiveMQMessage(DictWrapper):
@property
def message_id(self) -> str:
"""Unique identifier for the message"""
return self["messageID"]

@property
def message_type(self) -> str:
return self["messageType"]

@property
def data(self) -> str:
return self["data"]

@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64.b64decode(self.data.encode()).decode()

@property
def json_data(self) -> Any:
"""Parses the data as json"""
return json.loads(self.decoded_data)

@property
def connection_id(self) -> str:
return self["connectionId"]

@property
def redelivered(self) -> bool:
"""true if the message is being resent to the consumer"""
return self["redelivered"]

@property
def timestamp(self) -> int:
"""Time in milliseconds."""
return self["timestamp"]

@property
def broker_in_time(self) -> int:
"""Time stamp (in milliseconds) for when the message arrived at the broker."""
return self["brokerInTime"]

@property
def broker_out_time(self) -> int:
"""Time stamp (in milliseconds) for when the message left the broker."""
return self["brokerOutTime"]

@property
def destination_physicalname(self) -> str:
return self["destination"]["physicalname"]

@property
def delivery_mode(self) -> Optional[int]:
"""persistent or non-persistent delivery"""
return self.get("deliveryMode")

@property
def correlation_id(self) -> Optional[str]:
"""User defined correlation id"""
return self.get("correlationID")

@property
def reply_to(self) -> Optional[str]:
"""User defined reply to"""
return self.get("replyTo")

@property
def get_type(self) -> Optional[str]:
"""User defined message type"""
return self.get("type")

@property
def expiration(self) -> Optional[int]:
"""Expiration attribute whose value is given in milliseconds"""
return self.get("expiration")

@property
def priority(self) -> Optional[int]:
"""
JMS defines a ten-level priority value, with 0 as the lowest priority and 9
as the highest. In addition, clients should consider priorities 0-4 as
gradations of normal priority and priorities 5-9 as gradations of expedited
priority.
JMS does not require that a provider strictly implement priority ordering
of messages; however, it should do its best to deliver expedited messages
ahead of normal messages.
"""
return self.get("priority")


class ActiveMQEvent(DictWrapper):
"""Represents an Active MQ event sent to Lambda
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/
"""

@property
def event_source(self) -> str:
return self["eventSource"]

@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]

@property
def messages(self) -> Iterator[ActiveMQMessage]:
for record in self["messages"]:
yield ActiveMQMessage(record)

@property
def message(self) -> ActiveMQMessage:
return next(self.messages)
121 changes: 121 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import base64
import json
from typing import Any, Dict, List

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class BasicProperties(DictWrapper):
@property
def content_type(self) -> str:
return self["contentType"]

@property
def content_encoding(self) -> str:
return self["contentEncoding"]

@property
def headers(self) -> Dict[str, Any]:
return self["headers"]

@property
def delivery_mode(self) -> int:
return self["deliveryMode"]

@property
def priority(self) -> int:
return self["priority"]

@property
def correlation_id(self) -> str:
return self["correlationId"]

@property
def reply_to(self) -> str:
return self["replyTo"]

@property
def expiration(self) -> str:
return self["expiration"]

@property
def message_id(self) -> str:
return self["messageId"]

@property
def timestamp(self) -> str:
return self["timestamp"]

@property
def get_type(self) -> str:
return self["type"]

@property
def user_id(self) -> str:
return self["userId"]

@property
def app_id(self) -> str:
return self["appId"]

@property
def cluster_id(self) -> str:
return self["clusterId"]

@property
def body_size(self) -> int:
return self["bodySize"]


class RabbitMessage(DictWrapper):
@property
def basic_properties(self) -> BasicProperties:
return BasicProperties(self["basicProperties"])

@property
def redelivered(self) -> bool:
return self["redelivered"]

@property
def data(self) -> str:
return self["data"]

@property
def decoded_data(self) -> str:
"""Decodes the data as a str"""
return base64.b64decode(self.data.encode()).decode()

@property
def json_data(self) -> Any:
"""Parses the data as json"""
return json.loads(self.decoded_data)


class RabbitMQEvent(DictWrapper):
"""Represents a Rabbit MQ event sent to Lambda
Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html
- https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/
"""

def __init__(self, data: Dict[str, Any]):
super().__init__(data)
self._rmq_messages_by_queue = {
key: [RabbitMessage(message) for message in messages]
for key, messages in self["rmqMessagesByQueue"].items()
}

@property
def event_source(self) -> str:
return self["eventSource"]

@property
def event_source_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the event source"""
return self["eventSourceArn"]

@property
def rmq_messages_by_queue(self) -> Dict[str, List[RabbitMessage]]:
return self._rmq_messages_by_queue
Loading

0 comments on commit dcab235

Please sign in to comment.