diff --git a/awswrangler/dynamodb/_utils.py b/awswrangler/dynamodb/_utils.py index 046ebc79e..8108ccb00 100644 --- a/awswrangler/dynamodb/_utils.py +++ b/awswrangler/dynamodb/_utils.py @@ -2,11 +2,12 @@ import logging from types import TracebackType -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, Optional, Type, Union +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Mapping, Optional, Type, TypedDict, Union import boto3 from boto3.dynamodb.types import TypeDeserializer, TypeSerializer from botocore.exceptions import ClientError +from typing_extensions import NotRequired, Required from awswrangler import _utils, exceptions from awswrangler._config import apply_configs @@ -53,12 +54,33 @@ def get_table( return dynamodb_table +def _serialize_item( + item: Mapping[str, Any], serializer: Optional[TypeSerializer] = None +) -> Dict[str, "AttributeValueTypeDef"]: + serializer = serializer if serializer else TypeSerializer() + return {k: serializer.serialize(v) for k, v in item.items()} + + +def _deserialize_item( + item: Mapping[str, "AttributeValueTypeDef"], deserializer: Optional[TypeDeserializer] = None +) -> Dict[str, Any]: + deserializer = deserializer if deserializer else TypeDeserializer() + return {k: deserializer.deserialize(v) for k, v in item.items()} + + +class _ReadExecuteStatementKwargs(TypedDict): + Statement: Required[str] + ConsistentRead: Required[bool] + Parameters: NotRequired[List["AttributeValueTypeDef"]] + NextToken: NotRequired[str] + + def _execute_statement( - kwargs: Dict[str, Union[str, bool, List[Any]]], + kwargs: _ReadExecuteStatementKwargs, dynamodb_client: "DynamoDBClient", ) -> "ExecuteStatementOutputTypeDef": try: - response = dynamodb_client.execute_statement(**kwargs) # type: ignore[arg-type] + response = dynamodb_client.execute_statement(**kwargs) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": _logger.error("Couldn't execute PartiQL: '%s' because the table does not exist.", kwargs["Statement"]) @@ -73,22 +95,8 @@ def _execute_statement( return response -def _serialize_item( - item: Mapping[str, Any], serializer: Optional[TypeSerializer] = None -) -> Dict[str, "AttributeValueTypeDef"]: - serializer = serializer if serializer else TypeSerializer() - return {k: serializer.serialize(v) for k, v in item.items()} - - -def _deserialize_item( - item: Mapping[str, "AttributeValueTypeDef"], deserializer: Optional[TypeDeserializer] = None -) -> Dict[str, Any]: - deserializer = deserializer if deserializer else TypeDeserializer() - return {k: deserializer.deserialize(v) for k, v in item.items()} - - def _read_execute_statement( - kwargs: Dict[str, Union[str, bool, List[Any]]], + kwargs: _ReadExecuteStatementKwargs, dynamodb_client: "DynamoDBClient", ) -> Iterator[List[Dict[str, Any]]]: next_token: Optional[str] = "init_token" # Dummy token @@ -96,10 +104,12 @@ def _read_execute_statement( while next_token: response = _execute_statement(kwargs=kwargs, dynamodb_client=dynamodb_client) - next_token = response.get("NextToken", None) - kwargs["NextToken"] = next_token # type: ignore[assignment] yield [_deserialize_item(item, deserializer) for item in response["Items"]] + next_token = response.get("NextToken", None) + if next_token: + kwargs["NextToken"] = next_token + def execute_statement( statement: str, @@ -156,7 +166,7 @@ def execute_statement( ... parameters=[title, year], ... ) """ - kwargs: Dict[str, Union[str, bool, List[Any]]] = {"Statement": statement, "ConsistentRead": consistent_read} + kwargs: _ReadExecuteStatementKwargs = {"Statement": statement, "ConsistentRead": consistent_read} if parameters: serializer = TypeSerializer() kwargs["Parameters"] = [serializer.serialize(p) for p in parameters]