diff --git a/.github/workflows/backend_checks.yml b/.github/workflows/backend_checks.yml
index fbf8267532..2f62a33bf7 100644
--- a/.github/workflows/backend_checks.yml
+++ b/.github/workflows/backend_checks.yml
@@ -225,6 +225,9 @@ jobs:
OKTA_CLIENT_TOKEN: ${{ secrets.OKTA_FIDESCTL_CLIENT_TOKEN }}
AWS_DEFAULT_REGION: us-east-1
BIGQUERY_CONFIG: ${{ secrets.BIGQUERY_CONFIG }}
+ DYNAMODB_REGION: ${{ secrets.DYNAMODB_REGION }}
+ DYNAMODB_ACCESS_KEY_ID: ${{ secrets.DYNAMODB_ACCESS_KEY_ID }}
+ DYNAMODB_ACCESS_KEY: ${{ secrets.DYNAMODB_ACCESS_KEY }}
External-Datastores:
needs: Build
@@ -259,6 +262,10 @@ jobs:
BIGQUERY_KEYFILE_CREDS: ${{ secrets.BIGQUERY_KEYFILE_CREDS }}
BIGQUERY_DATASET: fidesopstest
SNOWFLAKE_TEST_URI: ${{ secrets.SNOWFLAKE_TEST_URI }}
+ DYNAMODB_REGION: ${{ secrets.DYNAMODB_REGION }}
+ DYNAMODB_ACCESS_KEY_ID: ${{ secrets.DYNAMODB_ACCESS_KEY_ID }}
+ DYNAMODB_ACCESS_KEY: ${{ secrets.DYNAMODB_ACCESS_KEY }}
+
run: nox -s "pytest(ops-external-datastores)"
External-SaaS-Connectors:
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cfd807a043..427aa68d04 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,6 +19,7 @@ The types of changes are:
### Added
+- Connector for DynamoDB [#2998](https://github.com/ethyca/fides/pull/2998)
- Access and erasure support for Amplitude [#2569](https://github.com/ethyca/fides/pull/2569)
- Access and erasure support for Gorgias [#2444](https://github.com/ethyca/fides/pull/2444)
- Privacy Experience Bulk Create, Bulk Update, and Detail Endpoints [#3185](https://github.com/ethyca/fides/pull/3185)
diff --git a/clients/admin-ui/public/images/connector-logos/dynamodb.svg b/clients/admin-ui/public/images/connector-logos/dynamodb.svg
new file mode 100644
index 0000000000..bccc7a31f0
--- /dev/null
+++ b/clients/admin-ui/public/images/connector-logos/dynamodb.svg
@@ -0,0 +1,10 @@
+
diff --git a/clients/admin-ui/src/features/datastore-connections/constants.ts b/clients/admin-ui/src/features/datastore-connections/constants.ts
index 7cb63c9729..03d452de02 100644
--- a/clients/admin-ui/src/features/datastore-connections/constants.ts
+++ b/clients/admin-ui/src/features/datastore-connections/constants.ts
@@ -46,6 +46,7 @@ export const CONNECTION_TYPE_LOGO_MAP = new Map([
[ConnectionType.TIMESCALE, "timescaledb.svg"],
[ConnectionType.SOVRN, "sovrn.svg"],
[ConnectionType.ATTENTIVE, "attentive.svg"],
+ [ConnectionType.DYNAMODB, "dynamodb.svg"],
]);
/**
diff --git a/clients/admin-ui/src/types/api/models/ConnectionType.ts b/clients/admin-ui/src/types/api/models/ConnectionType.ts
index 1df969d735..29a0b5d0e9 100644
--- a/clients/admin-ui/src/types/api/models/ConnectionType.ts
+++ b/clients/admin-ui/src/types/api/models/ConnectionType.ts
@@ -22,4 +22,5 @@ export enum ConnectionType {
MANUAL_WEBHOOK = "manual_webhook",
TIMESCALE = "timescale",
FIDES = "fides",
+ DYNAMODB = "dynamodb",
}
diff --git a/data/dataset/dynamodb_example_test_dataset.yml b/data/dataset/dynamodb_example_test_dataset.yml
new file mode 100644
index 0000000000..d9ecbb8d1f
--- /dev/null
+++ b/data/dataset/dynamodb_example_test_dataset.yml
@@ -0,0 +1,80 @@
+dataset:
+ - fides_key: dynamodb_example_test_dataset
+ name: DynamoDB Example Test Dataset
+ description: Example of a DynamoDB dataset containing a single customer table
+ collections:
+ - name: customer_identifier
+ fields:
+ - name: customer_id
+ data_categories: [user.unique_id]
+ fides_meta:
+ references:
+ - dataset: dynamodb_example_test_dataset
+ field: customer.id
+ direction: to
+ - dataset: dynamodb_example_test_dataset
+ field: login.customer_id
+ direction: to
+ - name: created
+ data_categories: [system.operations]
+ - name: email
+ data_categories: [user.contact.email]
+ fides_meta:
+ primary_key: True
+ identity: email
+ data_type: string
+ - name: name
+ data_categories: [user.name]
+ - name: address
+ fields:
+ - name: city
+ data_categories: [user.contact.address.city]
+ - name: house
+ data_categories: [user.contact.address.street]
+ - name: id
+ data_categories: [system.operations]
+ fides_meta:
+ primary_key: True
+ - name: state
+ data_categories: [user.contact.address.state]
+ - name: street
+ data_categories: [user.contact.address.street]
+ - name: zip
+ data_categories: [user.contact.address.postal_code]
+ - name: customer
+ fields:
+ - name: address_id
+ data_categories: [system.operations]
+ fides_meta:
+ references:
+ - dataset: dynamodb_example_test_dataset
+ field: address.id
+ direction: to
+ - name: created
+ data_categories: [system.operations]
+ - name: customer_email
+ data_categories: [user.contact.email]
+ fides_meta:
+ identity: email
+ data_type: string
+ - name: id
+ data_categories: [user.unique_id]
+ fides_meta:
+ primary_key: True
+ - name: name
+ data_categories: [user.name]
+ - name: login
+ fields:
+ - name: customer_id
+ data_categories: [user.unique_id]
+ fides_meta:
+ primary_key: True
+ - name: login_date
+ data_categories: [system.operations]
+ - name: name
+ data_categories: [user.name]
+ - name: customer_email
+ data_categories: [user.contact.email]
+ fides_meta:
+ identity: email
+ data_type: string
diff --git a/noxfiles/run_infrastructure.py b/noxfiles/run_infrastructure.py
index a54353d401..236db34f16 100644
--- a/noxfiles/run_infrastructure.py
+++ b/noxfiles/run_infrastructure.py
@@ -23,6 +23,11 @@
"snowflake": ["SNOWFLAKE_TEST_URI"],
"redshift": ["REDSHIFT_TEST_URI", "REDSHIFT_TEST_DB_SCHEMA"],
"bigquery": ["BIGQUERY_KEYFILE_CREDS", "BIGQUERY_DATASET"],
+ "dynamodb": [
+ "DYNAMODB_REGION",
+ "DYNAMODB_ACCESS_KEY_ID",
+ "DYNAMODB_ACCESS_KEY",
+ ],
}
EXTERNAL_DATASTORES = list(EXTERNAL_DATASTORE_CONFIG.keys())
ALL_DATASTORES = DOCKERFILE_DATASTORES + EXTERNAL_DATASTORES
diff --git a/noxfiles/test_setup_nox.py b/noxfiles/test_setup_nox.py
index bec4aec651..9d30a70a27 100644
--- a/noxfiles/test_setup_nox.py
+++ b/noxfiles/test_setup_nox.py
@@ -70,6 +70,12 @@ def pytest_ctl(session: Session, mark: str, coverage_arg: str) -> None:
"OKTA_CLIENT_TOKEN",
"-e",
"BIGQUERY_CONFIG",
+ "-e",
+ "DYNAMODB_REGION",
+ "-e",
+ "DYNAMODB_ACCESS_KEY_ID",
+ "-e",
+ "DYNAMODB_ACCESS_KEY",
CI_ARGS_EXEC,
CONTAINER_NAME,
"pytest",
@@ -133,6 +139,12 @@ def pytest_ops(session: Session, mark: str, coverage_arg: str) -> None:
"BIGQUERY_KEYFILE_CREDS",
"-e",
"BIGQUERY_DATASET",
+ "-e",
+ "DYNAMODB_REGION",
+ "-e",
+ "DYNAMODB_ACCESS_KEY_ID",
+ "-e",
+ "DYNAMODB_ACCESS_KEY",
CI_ARGS_EXEC,
CONTAINER_NAME,
"pytest",
diff --git a/pyproject.toml b/pyproject.toml
index 9d8ff4d1e6..54236119c9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -209,6 +209,7 @@ markers = [
"integration_twilio_conversations",
"integration_adobe_campaign",
"integration_firebase_auth",
+ "integration_dynamodb",
"unit_saas"
]
asyncio_mode = "auto"
diff --git a/src/fides/api/ctl/migrations/versions/fc04e3e637c0_add_dynamodb_to_connector_list.py b/src/fides/api/ctl/migrations/versions/fc04e3e637c0_add_dynamodb_to_connector_list.py
new file mode 100644
index 0000000000..dcc8cb4619
--- /dev/null
+++ b/src/fides/api/ctl/migrations/versions/fc04e3e637c0_add_dynamodb_to_connector_list.py
@@ -0,0 +1,49 @@
+"""add dynamodb to connector list
+
+Revision ID: fc04e3e637c0
+Revises: 15a3e7483249
+Create Date: 2023-04-14 10:19:50.681752
+
+"""
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "fc04e3e637c0"
+down_revision = "15a3e7483249"
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+ # Add 'dynamodb' to ConnectionType enum
+ op.execute("alter type connectiontype rename to connectiontype_old")
+ op.execute(
+ "create type connectiontype as enum('postgres', 'mongodb', 'mysql', 'https', "
+ "'snowflake', 'redshift', 'mssql', 'mariadb', 'bigquery', 'saas', 'manual', "
+ "'manual_webhook', 'timescale', 'fides', 'sovrn', 'attentive', 'dynamodb')"
+ )
+ op.execute(
+ (
+ "alter table connectionconfig alter column connection_type type connectiontype using "
+ "connection_type::text::connectiontype"
+ )
+ )
+ op.execute("drop type connectiontype_old")
+
+
+def downgrade():
+ # Remove dynamodb from the connectiontype enum
+ op.execute("delete from connectionconfig where connection_type in ('dynamodb')")
+ op.execute("alter type connectiontype rename to connectiontype_old")
+ op.execute(
+ "create type connectiontype as enum('postgres', 'mongodb', 'mysql', 'https', "
+ "'snowflake', 'redshift', 'mssql', 'mariadb', 'bigquery', 'saas', 'manual', "
+ "'email', 'manual_webhook', 'timescale', 'fides', 'sovrn', 'attentive')"
+ )
+ op.execute(
+ (
+ "alter table connectionconfig alter column connection_type type connectiontype using "
+ "connection_type::text::connectiontype"
+ )
+ )
+ op.execute("drop type connectiontype_old")
diff --git a/src/fides/api/ctl/routes/generate.py b/src/fides/api/ctl/routes/generate.py
index c6500749da..6ae4f7a42b 100644
--- a/src/fides/api/ctl/routes/generate.py
+++ b/src/fides/api/ctl/routes/generate.py
@@ -25,7 +25,11 @@
DatabaseConfig,
OktaConfig,
)
-from fides.core.dataset import generate_bigquery_datasets, generate_db_datasets
+from fides.core.dataset import (
+ generate_bigquery_datasets,
+ generate_db_datasets,
+ generate_dynamo_db_datasets,
+)
from fides.core.system import generate_aws_systems, generate_okta_systems
from fides.core.utils import validate_db_engine
@@ -39,6 +43,7 @@ class ValidTargets(str, Enum):
DB = "db"
OKTA = "okta"
BIGQUERY = "bigquery"
+ DYNAMODB = "dynamodb"
class GenerateTypes(str, Enum):
@@ -70,9 +75,10 @@ def target_matches_type(cls, values: Dict) -> Dict:
target_type = (values.get("target"), values.get("type"))
valid_target_types = [
("aws", "systems"),
- ("okta", "systems"),
- ("db", "datasets"),
("bigquery", "datasets"),
+ ("db", "datasets"),
+ ("dynamodb", "datasets"),
+ ("okta", "systems"),
]
if target_type not in valid_target_types:
raise ValueError("Target and Type are not a valid match")
@@ -129,6 +135,7 @@ async def generate(
* Okta: Systems
* DB: Datasets
* BigQuery: Datasets
+ * DynamoDB: Datasets
In the future, this will include options for other Systems & Datasets,
examples include:
@@ -141,7 +148,6 @@ async def generate(
)
generate_config = generate_request_payload.generate.config
generate_target = generate_request_payload.generate.target.lower()
-
try:
if generate_target == "aws" and isinstance(generate_config, AWSConfig):
generate_results = generate_aws(
@@ -167,6 +173,12 @@ async def generate(
bigquery_config=generate_config,
)
+ elif generate_target == "dynamodb" and isinstance(generate_config, AWSConfig):
+ generate_results = generate_dynamodb(
+ aws_config=generate_config,
+ organization=organization,
+ )
+
except ConnectorAuthFailureException as error:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
@@ -199,6 +211,29 @@ def generate_aws(
return [i.dict(exclude_none=True) for i in aws_systems]
+def generate_dynamodb(
+ aws_config: AWSConfig, organization: Organization
+) -> List[Dict[str, str]]:
+ """
+ Returns a list of DynamoDB datasets found in AWS.
+ """
+ from fides.connectors.aws import validate_credentials
+
+ log.info("Validating AWS credentials")
+ try:
+ validate_credentials(aws_config)
+ except ConnectorAuthFailureException as error:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail=str(error),
+ )
+
+ log.info("Generating datasets from AWS DynamoDB")
+ aws_resources = [generate_dynamo_db_datasets(aws_config=aws_config)]
+
+ return [i.dict(exclude_none=True) for i in aws_resources]
+
+
async def generate_okta(
okta_config: OktaConfig, organization: Organization
) -> List[Dict[str, str]]:
diff --git a/src/fides/api/ops/models/connectionconfig.py b/src/fides/api/ops/models/connectionconfig.py
index d1caeb8794..23706a58e4 100644
--- a/src/fides/api/ops/models/connectionconfig.py
+++ b/src/fides/api/ops/models/connectionconfig.py
@@ -51,6 +51,7 @@ class ConnectionType(enum.Enum):
manual = "manual" # Run as part of the traversal
sovrn = "sovrn"
attentive = "attentive"
+ dynamodb = "dynamodb"
manual_webhook = "manual_webhook" # Run before the traversal
timescale = "timescale"
fides = "fides"
@@ -63,6 +64,7 @@ def human_readable(self) -> str:
readable_mapping: Dict[str, str] = {
ConnectionType.attentive.value: "Attentive",
ConnectionType.bigquery.value: "BigQuery",
+ ConnectionType.dynamodb.value: "DynamoDB",
ConnectionType.fides.value: "Fides Connector",
ConnectionType.https.value: "Policy Webhook",
ConnectionType.manual.value: "Manual Connector",
diff --git a/src/fides/api/ops/schemas/connection_configuration/__init__.py b/src/fides/api/ops/schemas/connection_configuration/__init__.py
index 5f93940cba..282934d598 100644
--- a/src/fides/api/ops/schemas/connection_configuration/__init__.py
+++ b/src/fides/api/ops/schemas/connection_configuration/__init__.py
@@ -19,6 +19,12 @@
from fides.api.ops.schemas.connection_configuration.connection_secrets_bigquery import (
BigQuerySchema as BigQuerySchema,
)
+from fides.api.ops.schemas.connection_configuration.connection_secrets_dynamodb import (
+ DynamoDBDocsSchema as DynamoDBDocsSchema,
+)
+from fides.api.ops.schemas.connection_configuration.connection_secrets_dynamodb import (
+ DynamoDBSchema as DynamoDBSchema,
+)
from fides.api.ops.schemas.connection_configuration.connection_secrets_email import (
EmailDocsSchema as EmailDocsSchema,
)
@@ -101,6 +107,7 @@
secrets_schemas: Dict[str, Any] = {
ConnectionType.attentive.value: AttentiveSchema,
ConnectionType.bigquery.value: BigQuerySchema,
+ ConnectionType.dynamodb.value: DynamoDBSchema,
ConnectionType.fides.value: FidesConnectorSchema,
ConnectionType.https.value: HttpsSchema,
ConnectionType.manual_webhook.value: ManualWebhookSchema,
@@ -158,4 +165,5 @@ def get_connection_secrets_schema(
TimescaleDocsSchema,
FidesDocsSchema,
SovrnDocsSchema,
+ DynamoDBDocsSchema,
]
diff --git a/src/fides/api/ops/schemas/connection_configuration/connection_secrets_dynamodb.py b/src/fides/api/ops/schemas/connection_configuration/connection_secrets_dynamodb.py
new file mode 100644
index 0000000000..c7de1d6e33
--- /dev/null
+++ b/src/fides/api/ops/schemas/connection_configuration/connection_secrets_dynamodb.py
@@ -0,0 +1,24 @@
+from typing import List
+
+from fides.api.ops.schemas.base_class import NoValidationSchema
+from fides.api.ops.schemas.connection_configuration.connection_secrets import (
+ ConnectionConfigSecretsSchema,
+)
+
+
+class DynamoDBSchema(ConnectionConfigSecretsSchema):
+ """Schema to validate the secrets needed to connect to an Amazon DynamoDB cluster"""
+
+ region_name: str
+ aws_secret_access_key: str
+ aws_access_key_id: str
+
+ _required_components: List[str] = [
+ "region_name",
+ "aws_secret_access_key",
+ "aws_access_key_id",
+ ]
+
+
+class DynamoDBDocsSchema(DynamoDBSchema, NoValidationSchema):
+ """DynamoDB Secrets Schema for API Docs"""
diff --git a/src/fides/api/ops/service/connectors/__init__.py b/src/fides/api/ops/service/connectors/__init__.py
index 7109f5ff83..a79616a690 100644
--- a/src/fides/api/ops/service/connectors/__init__.py
+++ b/src/fides/api/ops/service/connectors/__init__.py
@@ -11,6 +11,9 @@
from fides.api.ops.service.connectors.base_connector import (
BaseConnector as BaseConnector,
)
+from fides.api.ops.service.connectors.dynamodb_connector import (
+ DynamoDBConnector as DynamoDBConnector,
+)
from fides.api.ops.service.connectors.email.attentive_connector import (
AttentiveConnector,
)
@@ -61,6 +64,7 @@
supported_connectors: Dict[str, Any] = {
ConnectionType.attentive.value: AttentiveConnector,
ConnectionType.bigquery.value: BigQueryConnector,
+ ConnectionType.dynamodb.value: DynamoDBConnector,
ConnectionType.fides.value: FidesConnector,
ConnectionType.https.value: HTTPSConnector,
ConnectionType.manual.value: ManualConnector,
diff --git a/src/fides/api/ops/service/connectors/dynamodb_connector.py b/src/fides/api/ops/service/connectors/dynamodb_connector.py
new file mode 100644
index 0000000000..4b0c9fe829
--- /dev/null
+++ b/src/fides/api/ops/service/connectors/dynamodb_connector.py
@@ -0,0 +1,177 @@
+import itertools
+from typing import Any, Dict, Generator, List, Optional
+
+from boto3.dynamodb.types import TypeDeserializer
+from botocore.exceptions import ClientError
+from loguru import logger
+
+import fides.connectors.aws as aws_connector
+from fides.api.ops.common_exceptions import ConnectionException
+from fides.api.ops.graph.traversal import TraversalNode
+from fides.api.ops.models.connectionconfig import ConnectionTestStatus
+from fides.api.ops.models.policy import Policy
+from fides.api.ops.models.privacy_request import PrivacyRequest
+from fides.api.ops.schemas.connection_configuration.connection_secrets_dynamodb import (
+ DynamoDBSchema,
+)
+from fides.api.ops.service.connectors.base_connector import BaseConnector
+from fides.api.ops.service.connectors.query_config import (
+ DynamoDBQueryConfig,
+ QueryConfig,
+)
+from fides.api.ops.util.collection_util import Row
+from fides.api.ops.util.logger import Pii
+from fides.connectors.models import (
+ AWSConfig,
+ ConnectorAuthFailureException,
+ ConnectorFailureException,
+)
+
+
+class DynamoDBConnector(BaseConnector[Any]): # type: ignore
+ """AWS DynamoDB Connector"""
+
+ def build_uri(self) -> None:
+ """Not used for this type"""
+
+ def close(self) -> None:
+ """Not used for this type"""
+
+ def create_client(self) -> Any: # type: ignore
+ """Returns a client for a DynamoDB instance"""
+ config = DynamoDBSchema(**self.configuration.secrets or {})
+ try:
+ aws_config = AWSConfig(
+ region_name=config.region_name,
+ aws_access_key_id=config.aws_access_key_id,
+ aws_secret_access_key=config.aws_secret_access_key,
+ )
+ return aws_connector.get_aws_client(
+ service="dynamodb", aws_config=aws_config
+ )
+ except ValueError:
+ raise ConnectionException("Value Error connecting to AWS DynamoDB.")
+
+ def query_config(self, node: TraversalNode) -> QueryConfig[Any]:
+ """Query wrapper corresponding to the input traversal_node."""
+ client = self.client()
+ try:
+ describe_table = client.describe_table(TableName=node.address.collection)
+ for key in describe_table["Table"]["KeySchema"]:
+ if key["KeyType"] == "HASH":
+ hash_key = key["AttributeName"]
+ break
+ for key in describe_table["Table"]["AttributeDefinitions"]:
+ if key["AttributeName"] == hash_key:
+ attribute_definitions = [key]
+ break
+ except ClientError as error:
+ raise ConnectorFailureException(error.response["Error"]["Message"])
+
+ return DynamoDBQueryConfig(node, attribute_definitions)
+
+ def test_connection(self) -> Optional[ConnectionTestStatus]:
+ """
+ Connects to AWS DynamoDB and lists tables to validate credentials.
+ """
+ logger.info("Starting test connection to {}", self.configuration.key)
+ client = self.client()
+ try:
+ client.list_tables()
+ except ClientError as error:
+ if error.response["Error"]["Code"] in [
+ "InvalidClientTokenId",
+ "SignatureDoesNotMatch",
+ ]:
+ raise ConnectorAuthFailureException(error.response["Error"]["Message"])
+ raise ConnectorFailureException(error.response["Error"]["Message"])
+
+ return ConnectionTestStatus.succeeded
+
+ def retrieve_data(
+ self,
+ node: TraversalNode,
+ policy: Policy,
+ privacy_request: PrivacyRequest,
+ input_data: Dict[str, List[Any]],
+ ) -> List[Row]:
+ """
+ Retrieve DynamoDB data.
+ In the case of complex objects, returns multiple rows
+ as the product of options to query against.
+ """
+ deserializer = TypeDeserializer()
+ collection_name = node.address.collection
+ client = self.client()
+ try:
+ results = []
+ query_config = self.query_config(node)
+ for attribute_definition in query_config.attribute_definitions: # type: ignore
+ attribute_name = attribute_definition["AttributeName"]
+ for identifier in input_data[attribute_name]:
+ selected_input_data = input_data
+ selected_input_data[attribute_name] = [identifier]
+ query_param = query_config.generate_query(
+ selected_input_data, policy
+ )
+ if query_param is None:
+ return []
+ items = client.query(
+ TableName=collection_name,
+ ExpressionAttributeValues=query_param[
+ "ExpressionAttributeValues"
+ ],
+ KeyConditionExpression=query_param["KeyConditionExpression"],
+ )
+ for item in items.get("Items"):
+ result = {}
+ for key, value in item.items():
+ deserialized_value = deserializer.deserialize(value)
+ result[key] = deserialized_value
+ results.append(result)
+ return results
+ except ClientError as error:
+ raise ConnectorFailureException(error.response["Error"]["Message"])
+
+ def mask_data(
+ self,
+ node: TraversalNode,
+ policy: Policy,
+ privacy_request: PrivacyRequest,
+ rows: List[Row],
+ input_data: Dict[str, List[Any]],
+ ) -> int:
+ """Execute a masking requestfor DynamoDB"""
+
+ query_config = self.query_config(node)
+ collection_name = node.address.collection
+ update_ct = 0
+ for row in rows:
+ update_items = query_config.generate_update_stmt(
+ row, policy, privacy_request
+ )
+ if update_items is not None:
+ client = self.client()
+ update_result = client.put_item(
+ TableName=collection_name,
+ Item=update_items,
+ )
+ if update_result["ResponseMetadata"]["HTTPStatusCode"] == 200:
+ update_ct += 1
+ logger.info(
+ "client.put_item({}, {})",
+ collection_name,
+ Pii(update_items),
+ )
+
+ return update_ct
+
+
+def product_dict(**kwargs: List) -> Generator:
+ """
+ Takes a dictionary of lists, returning the product
+ as a list of dictionaries.
+ """
+ keys = kwargs.keys()
+ for instance in itertools.product(*kwargs.values()):
+ yield dict(zip(keys, instance))
diff --git a/src/fides/api/ops/service/connectors/query_config.py b/src/fides/api/ops/service/connectors/query_config.py
index a5f67ac532..3f8244ca54 100644
--- a/src/fides/api/ops/service/connectors/query_config.py
+++ b/src/fides/api/ops/service/connectors/query_config.py
@@ -3,6 +3,7 @@
from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar
import pydash
+from boto3.dynamodb.types import TypeSerializer
from loguru import logger
from sqlalchemy import MetaData, Table, text
from sqlalchemy.engine import Engine
@@ -756,3 +757,79 @@ def dry_run_query(self) -> Optional[str]:
if mongo_query is not None:
return self.query_to_str(mongo_query, data)
return None
+
+
+DynamoDBStatement = Dict[str, Any]
+"""A DynamoDB query is formed using the boto3 library. The required parameters are:
+ * a table/collection name (string)
+ * the key name to pass when accessing the table, along with type and value (dict)
+ * optionally, the sort key or secondary index (i.e. timestamp)
+ * optionally, the specified attributes can be provided. If None, all attributes
+ returned for item.
+
+ # TODO finish these docs
+
+ We can either represent these items as a model and then handle each of the values
+ accordingly in the connector or use this query config to return a dictionary that
+ can be appropriately unpacked when executing using the client.
+
+ The get_item query has been left out of the query_config for now.
+
+ Add an example for put_item
+ """
+
+
+class DynamoDBQueryConfig(QueryConfig[DynamoDBStatement]):
+ def __init__(
+ self, node: TraversalNode, attribute_definitions: List[Dict[str, Any]]
+ ):
+ super().__init__(node)
+ self.attribute_definitions = attribute_definitions
+
+ def generate_query(
+ self,
+ input_data: Dict[str, List[Any]],
+ policy: Optional[Policy],
+ ) -> Optional[DynamoDBStatement]:
+ """Generates a dictionary for the `query` method used for DynamoDB"""
+ query_param = {}
+ serializer = TypeSerializer()
+ for attribute_definition in self.attribute_definitions:
+ attribute_name = attribute_definition["AttributeName"]
+ attribute_value = input_data[attribute_name][0]
+ query_param["ExpressionAttributeValues"] = {
+ ":value": serializer.serialize(attribute_value)
+ }
+ key_condition_expression: str = f"{attribute_name} = :value"
+ query_param["KeyConditionExpression"] = key_condition_expression # type: ignore
+ return query_param
+
+ def generate_update_stmt(
+ self, row: Row, policy: Policy, request: PrivacyRequest
+ ) -> Optional[DynamoDBStatement]:
+ """
+ Generate a Dictionary that contains necessary items to
+ run a PUT operation against DynamoDB
+ """
+ update_clauses = self.update_value_map(row, policy, request)
+
+ if update_clauses:
+ serializer = TypeSerializer()
+ update_items = row
+ for key, value in update_items.items():
+ if key in update_clauses:
+ update_items[key] = serializer.serialize(update_clauses[key])
+ else:
+ update_items[key] = serializer.serialize(value)
+ else:
+ update_items = None
+
+ return update_items
+
+ def query_to_str(self, t: T, input_data: Dict[str, List[Any]]) -> None:
+ """Not used for this connector"""
+ return None
+
+ def dry_run_query(self) -> None:
+ """Not used for this connector"""
+ return None
diff --git a/src/fides/api/ops/task/task_resources.py b/src/fides/api/ops/task/task_resources.py
index 8f56219894..06c4d1e1a9 100644
--- a/src/fides/api/ops/task/task_resources.py
+++ b/src/fides/api/ops/task/task_resources.py
@@ -16,6 +16,7 @@
from fides.api.ops.service.connectors import (
BaseConnector,
BigQueryConnector,
+ DynamoDBConnector,
FidesConnector,
ManualConnector,
MariaDBConnector,
@@ -77,6 +78,8 @@ def build_connector( # pylint: disable=R0911,R0912
return ManualConnector(connection_config)
if connection_config.connection_type == ConnectionType.timescale:
return TimescaleConnector(connection_config)
+ if connection_config.connection_type == ConnectionType.dynamodb:
+ return DynamoDBConnector(connection_config)
if connection_config.connection_type == ConnectionType.fides:
return FidesConnector(connection_config)
raise NotImplementedError(
diff --git a/src/fides/cli/commands/generate.py b/src/fides/cli/commands/generate.py
index 5cd8d64392..83258b4540 100644
--- a/src/fides/cli/commands/generate.py
+++ b/src/fides/cli/commands/generate.py
@@ -111,6 +111,52 @@ def generate_dataset_bigquery(
)
+@generate_dataset.group(name="aws")
+@click.pass_context
+def generate_dataset_aws(ctx: click.Context) -> None:
+ """
+ Generate Fides datasets from specific Amazon Web Services.
+ """
+
+
+@generate_dataset_aws.command(name="dynamodb")
+@click.pass_context
+@click.argument("output_filename", type=str)
+@credentials_id_option
+@aws_access_key_id_option
+@aws_secret_access_key_option
+@aws_region_option
+@include_null_flag
+@with_analytics
+def generate_dataset_dynamodb(
+ ctx: click.Context,
+ output_filename: str,
+ include_null: bool,
+ credentials_id: str,
+ access_key_id: str,
+ secret_access_key: str,
+ region: str,
+) -> None:
+ """
+ Generate a dataset object from BigQuery using a SQLAlchemy connection string.
+ """
+
+ config = ctx.obj["CONFIG"]
+ aws_config = handle_aws_credentials_options(
+ fides_config=config,
+ access_key_id=access_key_id,
+ secret_access_key=secret_access_key,
+ region=region,
+ credentials_id=credentials_id,
+ )
+
+ bigquery_datasets = _dataset.generate_dynamo_db_datasets(aws_config)
+
+ _dataset.write_dataset_manifest(
+ file_name=output_filename, include_null=include_null, datasets=bigquery_datasets
+ )
+
+
@generate.group(name="system")
@click.pass_context
def generate_system(ctx: click.Context) -> None:
diff --git a/src/fides/connectors/aws.py b/src/fides/connectors/aws.py
index e4823bce86..2f050b1430 100644
--- a/src/fides/connectors/aws.py
+++ b/src/fides/connectors/aws.py
@@ -4,13 +4,20 @@
import boto3
from botocore.exceptions import ClientError
-from fideslang.models import System, SystemMetadata
+from fideslang.models import (
+ Dataset,
+ DatasetCollection,
+ DatasetField,
+ System,
+ SystemMetadata,
+)
from fides.connectors.models import (
AWSConfig,
ConnectorAuthFailureException,
ConnectorFailureException,
)
+from fides.core.utils import generate_unique_fides_key
def get_aws_client(service: str, aws_config: Optional[AWSConfig]) -> Any: # type: ignore
@@ -82,6 +89,27 @@ def describe_rds_instances(client: Any) -> Dict[str, List[Dict]]: # type: ignor
return describe_instances
+@handle_common_aws_errors
+def describe_dynamo_tables(client: Any, table_names: List[str]) -> List[Dict]: # type: ignore
+ """
+ Returns describe_table response given a 'dynamodb' boto3 client.
+ """
+ describe_tables = []
+ for table in table_names:
+ described_table = client.describe_table(TableName=table)
+ describe_tables.append(described_table["Table"])
+ return describe_tables
+
+
+@handle_common_aws_errors
+def get_dynamo_tables(client: Any) -> List[str]: # type: ignore
+ """
+ Returns a list of table names response given a 'rds' boto3 client.
+ """
+ list_tables = client.list_tables()
+ return list_tables["TableNames"]
+
+
@handle_common_aws_errors
def get_tagging_resources(client: Any) -> List[str]: # type: ignore
"""
@@ -96,6 +124,39 @@ def get_tagging_resources(client: Any) -> List[str]: # type: ignore
return found_arns
+def create_dynamodb_dataset(
+ described_dynamo_tables: List[Dict], organization_key: str = "default_organization"
+) -> Dataset:
+ """
+ Given "describe_table" response(s), build a dataset object to represent
+ each dynamodb table, returning a fides dataset.
+ """
+ # TODO: add something for improved dataset uniqueness, i.e. region/account
+ dataset_name = "DynamoDB"
+ unique_dataset_name = generate_unique_fides_key(dataset_name, "", "")
+ dataset = Dataset(
+ name=dataset_name,
+ fides_key=unique_dataset_name,
+ organization_fides_key=organization_key,
+ collections=[
+ DatasetCollection(
+ name=collection["TableName"],
+ fields=[
+ DatasetField(
+ name=field["AttributeName"],
+ description=f"Fides Generated Description for Column: {field['AttributeName']}",
+ data_categories=[],
+ # TODO: include a fieldsmeta if the field is a primary key or secondary sort (and test for both)
+ )
+ for field in collection["AttributeDefinitions"]
+ ],
+ )
+ for collection in described_dynamo_tables
+ ],
+ )
+ return dataset
+
+
def create_redshift_systems(
describe_clusters: Dict[str, List[Dict]], organization_key: str
) -> List[System]:
diff --git a/src/fides/core/dataset.py b/src/fides/core/dataset.py
index 5adcce74d1..cdbd69ad4b 100644
--- a/src/fides/core/dataset.py
+++ b/src/fides/core/dataset.py
@@ -7,8 +7,14 @@
from pydantic import AnyHttpUrl
from sqlalchemy.engine import Engine
+from fides.connectors.aws import (
+ create_dynamodb_dataset,
+ describe_dynamo_tables,
+ get_aws_client,
+ get_dynamo_tables,
+)
from fides.connectors.bigquery import get_bigquery_engine
-from fides.connectors.models import BigQueryConfig
+from fides.connectors.models import AWSConfig, BigQueryConfig
from fides.core.api_helpers import list_server_resources
from fides.core.parse import parse
@@ -356,3 +362,14 @@ def generate_bigquery_datasets(bigquery_config: BigQueryConfig) -> List[Dataset]
for dataset in bigquery_datasets
]
return unique_bigquery_datasets
+
+
+def generate_dynamo_db_datasets(aws_config: Optional[AWSConfig]) -> Dataset:
+ """
+ Given an AWS config, extract all DynamoDB tables/fields and generate corresponding datasets.
+ """
+ client = get_aws_client(service="dynamodb", aws_config=aws_config)
+ dynamo_tables = get_dynamo_tables(client)
+ described_dynamo_tables = describe_dynamo_tables(client, dynamo_tables)
+ dynamo_dataset = create_dynamodb_dataset(described_dynamo_tables)
+ return dynamo_dataset
diff --git a/tests/conftest.py b/tests/conftest.py
index 99c86a4a8d..70a6588d31 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -44,6 +44,7 @@
from fides.core.config.config_proxy import ConfigProxy
from tests.fixtures.application_fixtures import *
from tests.fixtures.bigquery_fixtures import *
+from tests.fixtures.dynamodb_fixtures import *
from tests.fixtures.email_fixtures import *
from tests.fixtures.fides_connector_example_fixtures import *
from tests.fixtures.integration_fixtures import *
diff --git a/tests/ctl/api/test_generate.py b/tests/ctl/api/test_generate.py
index 43a1817c06..86164cbb4e 100644
--- a/tests/ctl/api/test_generate.py
+++ b/tests/ctl/api/test_generate.py
@@ -29,6 +29,11 @@
"orgUrl": "https://dev-78908748.okta.com",
"token": getenv("OKTA_CLIENT_TOKEN", ""),
},
+ "dynamodb": {
+ "region_name": getenv("DYNAMODB_REGION", ""),
+ "aws_access_key_id": getenv("DYNAMODB_ACCESS_KEY_ID", ""),
+ "aws_secret_access_key": getenv("DYNAMODB_ACCESS_KEY", ""),
+ },
}
EXTERNAL_FAILURE_CONFIG_BODY = {
@@ -50,6 +55,11 @@
"orgUrl": "https://dev-78908748.okta.com",
"token": "INVALID_TOKEN",
},
+ "dynamodb": {
+ "region_name": getenv("DYNAMODB_REGION", ""),
+ "aws_access_key_id": "ILLEGAL_ACCESS_KEY_ID",
+ "aws_secret_access_key": "ILLEGAL_SECRET_ACCESS_KEY_ID",
+ },
}
EXTERNAL_FAILURE_CONFIG_BODY["bigquery"]["keyfile_creds"][
"project_id"
@@ -60,6 +70,7 @@
"okta": "Invalid token provided",
"db": 'FATAL: database "INVALID_DB" does not exist\n\n(Background on this error at: https://sqlalche.me/e/14/e3q8)',
"bigquery": "Invalid project ID 'INVALID_PROJECT_ID'. Project IDs must contain 6-63 lowercase letters, digits, or dashes. Some project IDs also include domain name separated by a colon. IDs must start with a letter and may not end with a dash.",
+ "dynamodb": "The security token included in the request is invalid.",
}
@@ -71,6 +82,7 @@
("systems", "okta"),
("datasets", "db"),
("datasets", "bigquery"),
+ ("datasets", "dynamodb"),
],
)
def test_generate(
@@ -106,6 +118,7 @@ def test_generate(
("systems", "okta"),
("datasets", "db"),
("datasets", "bigquery"),
+ ("datasets", "dynamodb"),
],
)
def test_generate_failure(
diff --git a/tests/fixtures/application_fixtures.py b/tests/fixtures/application_fixtures.py
index c589a73ced..00c1237ffb 100644
--- a/tests/fixtures/application_fixtures.py
+++ b/tests/fixtures/application_fixtures.py
@@ -150,6 +150,15 @@
integration_config, "fides_example.polling_timeout"
),
},
+ "dynamodb_example": {
+ "region": pydash.get(integration_config, "dynamodb_example.region"),
+ "aws_access_key_id": pydash.get(
+ integration_config, "dynamodb_example.aws_access_key_id"
+ ),
+ "aws_secret_access_key": pydash.get(
+ integration_config, "dynamodb_example.aws_secret_access_key"
+ ),
+ },
}
@@ -1709,6 +1718,7 @@ def example_datasets() -> List[Dict]:
"data/dataset/manual_dataset.yml",
"data/dataset/email_dataset.yml",
"data/dataset/remote_fides_example_test_dataset.yml",
+ "data/dataset/dynamodb_example_test_dataset.yml",
]
for filename in example_filenames:
example_datasets += load_dataset(filename)
diff --git a/tests/fixtures/dynamodb_fixtures.py b/tests/fixtures/dynamodb_fixtures.py
new file mode 100644
index 0000000000..fb0d6ad527
--- /dev/null
+++ b/tests/fixtures/dynamodb_fixtures.py
@@ -0,0 +1,94 @@
+import os
+from typing import Any, Dict, Generator, List
+from uuid import uuid4
+
+import pytest
+from sqlalchemy.orm import Session
+
+from fides.api.ctl.sql_models import Dataset as CtlDataset
+from fides.api.ops.models.connectionconfig import (
+ AccessLevel,
+ ConnectionConfig,
+ ConnectionType,
+)
+from fides.api.ops.models.datasetconfig import DatasetConfig
+from fides.api.ops.schemas.connection_configuration.connection_secrets_dynamodb import (
+ DynamoDBSchema,
+)
+
+from .application_fixtures import integration_secrets
+
+
+@pytest.fixture(scope="function")
+def dynamodb_connection_config_without_secrets(db: Session) -> Generator:
+ """
+ Returns a DynamoDB ConnectionConfig without secrets
+ attached that is safe to use in any test.
+ """
+ connection_config = ConnectionConfig.create(
+ db=db,
+ data={
+ "name": str(uuid4()),
+ "key": "my_dynamodb_config",
+ "connection_type": ConnectionType.dynamodb,
+ "access": AccessLevel.write,
+ "secrets": integration_secrets["dynamodb_example"],
+ },
+ )
+ yield connection_config
+ connection_config.delete(db)
+
+
+@pytest.fixture(scope="function")
+def dynamodb_connection_config(
+ db: Any, # type: ignore
+ integration_config: Dict[str, str],
+ dynamodb_connection_config_without_secrets: ConnectionConfig,
+) -> Generator:
+ """
+ Returns a DynamoDB ConectionConfig with secrets attached if secrets are present
+ in the configuration.
+ """
+ dynamodb_connection_config = dynamodb_connection_config_without_secrets
+ region = integration_config.get("dynamodb_example", {}).get(
+ "region"
+ ) or os.environ.get("DYNAMODB_REGION")
+ aws_access_key_id = integration_config.get("dynamodb_example", {}).get(
+ "aws_access_key_id"
+ ) or os.environ.get("DYNAMODB_ACCESS_KEY_ID")
+ aws_secret_access_key = integration_config.get("dynamodb_example", {}).get(
+ "aws_secret_access_key"
+ ) or os.environ.get("DYNAMODB_ACCESS_KEY")
+ if region is not None:
+ schema = DynamoDBSchema(
+ region_name=region,
+ aws_access_key_id=aws_access_key_id,
+ aws_secret_access_key=aws_secret_access_key,
+ )
+ dynamodb_connection_config.secrets = schema.dict()
+ dynamodb_connection_config.save(db=db)
+ yield dynamodb_connection_config
+
+
+@pytest.fixture
+def dynamodb_example_test_dataset_config(
+ dynamodb_connection_config: ConnectionConfig,
+ db: Any, # type: ignore
+ example_datasets: List[Dict],
+) -> Generator:
+ dataset = example_datasets[11]
+ fides_key = dataset["fides_key"]
+
+ ctl_dataset = CtlDataset.create_from_dataset_dict(db, dataset)
+
+ dataset_config = DatasetConfig.create(
+ db=db,
+ data={
+ "connection_config_id": dynamodb_connection_config.id,
+ "fides_key": fides_key,
+ "ctl_dataset_id": ctl_dataset.id,
+ },
+ )
+ yield dataset_config
+ dataset_config.delete(db=db)
+ ctl_dataset.delete(db=db)
diff --git a/tests/fixtures/integration_fixtures.py b/tests/fixtures/integration_fixtures.py
index b1184d9ced..ef05745bf9 100644
--- a/tests/fixtures/integration_fixtures.py
+++ b/tests/fixtures/integration_fixtures.py
@@ -474,3 +474,20 @@ def mongo_inserts(integration_mongodb_connector):
mongo_delete(
integration_mongodb_connector, "mongo_test", table_name, record_list
)
+
+
+# ======================= dynamodb ==========================
+
+
+@pytest.fixture(scope="function")
+def integration_dynamodb_config(db) -> ConnectionConfig:
+ connection_config = ConnectionConfig(
+ key="dynamodb_example",
+ connection_type=ConnectionType.dynamodb,
+ access=AccessLevel.write,
+ secrets=integration_secrets["dynamodb_example"],
+ name="dynamodb_example",
+ )
+ connection_config.save(db)
+ yield connection_config
+ connection_config.delete(db)
diff --git a/tests/ops/api/v1/endpoints/test_connection_template_endpoints.py b/tests/ops/api/v1/endpoints/test_connection_template_endpoints.py
index f50624e02d..5a88ae3b28 100644
--- a/tests/ops/api/v1/endpoints/test_connection_template_endpoints.py
+++ b/tests/ops/api/v1/endpoints/test_connection_template_endpoints.py
@@ -249,7 +249,7 @@ def test_search_system_type(self, api_client, generate_auth_header, url):
resp = api_client.get(url + "system_type=database", headers=auth_header)
assert resp.status_code == 200
data = resp.json()["items"]
- assert len(data) == 9
+ assert len(data) == 10
def test_search_system_type_and_connection_type(
self,
diff --git a/tests/ops/api/v1/endpoints/test_dataset_endpoints.py b/tests/ops/api/v1/endpoints/test_dataset_endpoints.py
index 259313b39c..3a9c2102c8 100644
--- a/tests/ops/api/v1/endpoints/test_dataset_endpoints.py
+++ b/tests/ops/api/v1/endpoints/test_dataset_endpoints.py
@@ -60,6 +60,8 @@ def test_example_datasets(example_datasets):
assert len(example_datasets[7]["collections"]) == 11
assert example_datasets[9]["fides_key"] == "email_dataset"
assert len(example_datasets[9]["collections"]) == 3
+ assert example_datasets[11]["fides_key"] == "dynamodb_example_test_dataset"
+ assert len(example_datasets[11]["collections"]) == 4
class TestValidateDataset:
@@ -949,7 +951,7 @@ def test_patch_datasets_bulk_create(
)
assert response.status_code == 200
response_body = json.loads(response.text)
- assert len(response_body["succeeded"]) == 11
+ assert len(response_body["succeeded"]) == 12
assert len(response_body["failed"]) == 0
# Confirm that postgres dataset matches the values we provided
@@ -1092,7 +1094,7 @@ def test_patch_datasets_bulk_update(
assert response.status_code == 200
response_body = json.loads(response.text)
- assert len(response_body["succeeded"]) == 11
+ assert len(response_body["succeeded"]) == 12
assert len(response_body["failed"]) == 0
# test postgres
@@ -1343,7 +1345,7 @@ def test_patch_datasets_failed_response(
assert response.status_code == 200 # Returns 200 regardless
response_body = json.loads(response.text)
assert len(response_body["succeeded"]) == 0
- assert len(response_body["failed"]) == 11
+ assert len(response_body["failed"]) == 12
for failed_response in response_body["failed"]:
assert "Dataset create/update failed" in failed_response["message"]
diff --git a/tests/ops/integration_test_config.toml b/tests/ops/integration_test_config.toml
index e34d6a1f52..3705cae106 100644
--- a/tests/ops/integration_test_config.toml
+++ b/tests/ops/integration_test_config.toml
@@ -58,3 +58,8 @@ uri="http://fides:8080"
username="root_user"
password="Testpassword1!"
polling_timeout=1800
+
+[dynamodb_example]
+aws_access_key_id = ""
+aws_secret_access_key = ""
+region = ""
diff --git a/tests/ops/service/connectors/test_queryconfig.py b/tests/ops/service/connectors/test_queryconfig.py
index 936614ac1b..753f654053 100644
--- a/tests/ops/service/connectors/test_queryconfig.py
+++ b/tests/ops/service/connectors/test_queryconfig.py
@@ -1,6 +1,8 @@
+from datetime import datetime, timezone
from typing import Any, Dict, Set
import pytest
+from boto3.dynamodb.types import TypeDeserializer, TypeSerializer
from fideslang.models import Dataset
from fides.api.ops.graph.config import (
@@ -17,6 +19,7 @@
from fides.api.ops.schemas.masking.masking_configuration import HashMaskingConfiguration
from fides.api.ops.schemas.masking.masking_secrets import MaskingSecretCache, SecretType
from fides.api.ops.service.connectors.query_config import (
+ DynamoDBQueryConfig,
MongoQueryConfig,
SQLQueryConfig,
)
@@ -606,3 +609,117 @@ def test_generate_update_stmt_multiple_rules(
["1988-01-10"], request_id=privacy_request.id
)[0]
)
+
+
+class TestDynamoDBQueryConfig:
+ @pytest.fixture(scope="function")
+ def identity(self):
+ identity = {"email": "customer-test_uuid@example.com"}
+ return identity
+
+ @pytest.fixture(scope="function")
+ def dataset_graph(self, integration_dynamodb_config, example_datasets):
+ dataset = Dataset(**example_datasets[11])
+ dataset_graph = convert_dataset_to_graph(
+ dataset, integration_dynamodb_config.key
+ )
+
+ return DatasetGraph(*[dataset_graph])
+
+ @pytest.fixture(scope="function")
+ def traversal(self, identity, dataset_graph):
+ dynamo_traversal = Traversal(dataset_graph, identity)
+ return dynamo_traversal
+
+ @pytest.fixture(scope="function")
+ def customer_node(self, traversal):
+ return traversal.traversal_node_dict[
+ CollectionAddress("dynamodb_example_test_dataset", "customer")
+ ]
+
+ @pytest.fixture(scope="function")
+ def customer_identifier_node(self, traversal):
+ return traversal.traversal_node_dict[
+ CollectionAddress("dynamodb_example_test_dataset", "customer_identifier")
+ ]
+
+ @pytest.fixture(scope="function")
+ def customer_row(self):
+ row = {
+ "customer_email": {"S": "customer-1@example.com"},
+ "name": {"S": "John Customer"},
+ "address_id": {"L": [{"S": "1"}, {"S": "2"}]},
+ "personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}},
+ "id": {"S": "1"},
+ }
+ return row
+
+ @pytest.fixture(scope="function")
+ def deserialized_customer_row(self, customer_row):
+ deserialized_customer_row = {}
+ deserializer = TypeDeserializer()
+ for key, value in customer_row.items():
+ deserialized_customer_row[key] = deserializer.deserialize(value)
+ return deserialized_customer_row
+
+ @pytest.fixture(scope="function")
+ def customer_identifier_row(self):
+ row = {
+ "customer_id": {"S": "customer-1@example.com"},
+ "email": {"S": "customer-1@example.com"},
+ "name": {"S": "Customer 1"},
+ "created": {"S": datetime.now(timezone.utc).isoformat()},
+ }
+ return row
+
+ @pytest.fixture(scope="function")
+ def deserialized_customer_identifier_row(self, customer_identifier_row):
+ deserialized_customer_identifier_row = {}
+ deserializer = TypeDeserializer()
+ for key, value in customer_identifier_row.items():
+ deserialized_customer_identifier_row[key] = deserializer.deserialize(value)
+ return deserialized_customer_identifier_row
+
+ def test_get_query_param_formatting_single_key(
+ self,
+ resources_dict,
+ customer_node,
+ ) -> None:
+ input_data = {
+ "fidesops_grouped_inputs": [],
+ "email": ["customer-test_uuid@example.com"],
+ }
+ attribute_definitions = [{"AttributeName": "email", "AttributeType": "S"}]
+ query_config = DynamoDBQueryConfig(customer_node, attribute_definitions)
+ item = query_config.generate_query(
+ input_data=input_data, policy=resources_dict["policy"]
+ )
+ assert item["ExpressionAttributeValues"] == {
+ ":value": {"S": "customer-test_uuid@example.com"}
+ }
+ assert item["KeyConditionExpression"] == "email = :value"
+
+ def test_put_query_param_formatting_single_key(
+ self,
+ erasure_policy,
+ customer_node,
+ deserialized_customer_row,
+ ) -> None:
+ input_data = {
+ "fidesops_grouped_inputs": [],
+ "email": ["customer-test_uuid@example.com"],
+ }
+ attribute_definitions = [{"AttributeName": "email", "AttributeType": "S"}]
+ query_config = DynamoDBQueryConfig(customer_node, attribute_definitions)
+ update_item = query_config.generate_update_stmt(
+ deserialized_customer_row, erasure_policy, privacy_request
+ )
+
+ assert update_item == {
+ "customer_email": {"S": "customer-1@example.com"},
+ "name": {"NULL": True},
+ "address_id": {"S": "1"},
+ "address_id": {"L": [{"S": "1"}, {"S": "2"}]},
+ "personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}},
+ "id": {"S": "1"},
+ }
diff --git a/tests/ops/service/privacy_request/test_request_runner_service.py b/tests/ops/service/privacy_request/test_request_runner_service.py
index 544791a6d6..c99c6769ff 100644
--- a/tests/ops/service/privacy_request/test_request_runner_service.py
+++ b/tests/ops/service/privacy_request/test_request_runner_service.py
@@ -1,4 +1,6 @@
+# pylint: disable=missing-docstring, redefined-outer-name
import time
+from datetime import datetime, timezone
from typing import Any, Dict, List, Set
from unittest import mock
from unittest.mock import ANY, Mock, call
@@ -6,6 +8,7 @@
import pydash
import pytest
+from boto3.dynamodb.types import TypeDeserializer
from pydantic import ValidationError
from sqlalchemy import column, select, table
from sqlalchemy.orm import Session
@@ -43,6 +46,7 @@
from fides.api.ops.schemas.redis_cache import Identity
from fides.api.ops.schemas.saas.saas_config import SaaSRequest
from fides.api.ops.schemas.saas.shared_schemas import HTTPMethod, SaaSRequestParams
+from fides.api.ops.service.connectors.dynamodb_connector import DynamoDBConnector
from fides.api.ops.service.connectors.saas_connector import SaaSConnector
from fides.api.ops.service.connectors.sql_connector import (
RedshiftConnector,
@@ -2254,3 +2258,193 @@ def test_needs_batch_email_send_new_workflow(
{"email": "customer-1@example.com", "ljt_readerID": "12345"},
privacy_request_with_consent_policy,
)
+
+
+@pytest.fixture(scope="function")
+def dynamodb_resources(
+ dynamodb_example_test_dataset_config,
+):
+ dynamodb_connection_config = dynamodb_example_test_dataset_config.connection_config
+ dynamodb_client = DynamoDBConnector(dynamodb_connection_config).client()
+ uuid = str(uuid4())
+ customer_email = f"customer-{uuid}@example.com"
+ customer_name = f"{uuid}"
+
+ ## document and remove remaining comments if we can't get the bigger test running
+ items = {
+ "customer_identifier": [
+ {
+ "customer_id": {"S": customer_name},
+ "email": {"S": customer_email},
+ "name": {"S": customer_name},
+ "created": {"S": datetime.now(timezone.utc).isoformat()},
+ }
+ ],
+ "customer": [
+ {
+ "id": {"S": customer_name},
+ "name": {"S": customer_name},
+ "email": {"S": customer_email},
+ "address_id": {"L": [{"S": customer_name}, {"S": customer_name}]},
+ "personal_info": {"M": {"gender": {"S": "male"}, "age": {"S": "99"}}},
+ "created": {"S": datetime.now(timezone.utc).isoformat()},
+ }
+ ],
+ "address": [
+ {
+ "id": {"S": customer_name},
+ "city": {"S": "city"},
+ "house": {"S": "house"},
+ "state": {"S": "state"},
+ "street": {"S": "street"},
+ "zip": {"S": "zip"},
+ }
+ ],
+ "login": [
+ {
+ "customer_id": {"S": customer_name},
+ "login_date": {"S": "2023-01-01"},
+ "name": {"S": customer_name},
+ "email": {"S": customer_email},
+ },
+ {
+ "customer_id": {"S": customer_name},
+ "login_date": {"S": "2023-01-02"},
+ "name": {"S": customer_name},
+ "email": {"S": customer_email},
+ },
+ ],
+ }
+
+ for table_name, rows in items.items():
+ for item in rows:
+ res = dynamodb_client.put_item(
+ TableName=table_name,
+ Item=item,
+ )
+ assert res["ResponseMetadata"]["HTTPStatusCode"] == 200
+
+ yield {
+ "email": customer_email,
+ "formatted_email": customer_email,
+ "name": customer_name,
+ "customer_id": uuid,
+ "client": dynamodb_client,
+ }
+ # Remove test data and close Dynamodb connection in teardown
+ delete_items = {
+ "customer_identifier": [{"email": {"S": customer_email}}],
+ "customer": [{"id": {"S": customer_name}}],
+ "address": [{"id": {"S": customer_name}}],
+ "login": [
+ {
+ "customer_id": {"S": customer_name},
+ "login_date": {"S": "2023-01-01"},
+ },
+ {
+ "customer_id": {"S": customer_name},
+ "login_date": {"S": "2023-01-02"},
+ },
+ ],
+ }
+ for table_name, rows in delete_items.items():
+ for item in rows:
+ res = dynamodb_client.delete_item(
+ TableName=table_name,
+ Key=item,
+ )
+ assert res["ResponseMetadata"]["HTTPStatusCode"] == 200
+
+
+@pytest.mark.integration_external
+@pytest.mark.integration_dynamodb
+def test_create_and_process_access_request_dynamodb(
+ dynamodb_resources,
+ db,
+ cache,
+ policy,
+ run_privacy_request_task,
+):
+ customer_email = dynamodb_resources["email"]
+ customer_name = dynamodb_resources["name"]
+ customer_id = dynamodb_resources["customer_id"]
+ data = {
+ "requested_at": "2021-08-30T16:09:37.359Z",
+ "policy_key": policy.key,
+ "identity": {"email": customer_email},
+ }
+
+ pr = get_privacy_request_results(
+ db,
+ policy,
+ run_privacy_request_task,
+ data,
+ task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL,
+ )
+ results = pr.get_results()
+ customer_table_key = (
+ f"EN_{pr.id}__access_request__dynamodb_example_test_dataset:customer"
+ )
+ address_table_key = (
+ f"EN_{pr.id}__access_request__dynamodb_example_test_dataset:address"
+ )
+ login_table_key = f"EN_{pr.id}__access_request__dynamodb_example_test_dataset:login"
+ assert len(results[customer_table_key]) == 1
+ assert len(results[address_table_key]) == 2
+ assert len(results[login_table_key]) == 2
+ assert results[customer_table_key][0]["email"] == customer_email
+ assert results[customer_table_key][0]["name"] == customer_name
+ assert results[customer_table_key][0]["id"] == customer_id
+ assert results[address_table_key][0]["id"] == customer_id
+ assert results[login_table_key][0]["name"] == customer_name
+
+ pr.delete(db=db)
+
+
+@pytest.mark.integration_external
+@pytest.mark.integration_dynamodb
+def test_create_and_process_erasure_request_dynamodb(
+ dynamodb_example_test_dataset_config,
+ dynamodb_resources,
+ integration_config: Dict[str, str],
+ db,
+ cache,
+ erasure_policy,
+ run_privacy_request_task,
+):
+ customer_email = dynamodb_resources["email"]
+ dynamodb_client = dynamodb_resources["client"]
+ customer_id = dynamodb_resources["customer_id"]
+ customer_name = dynamodb_resources["name"]
+ data = {
+ "requested_at": "2021-08-30T16:09:37.359Z",
+ "policy_key": erasure_policy.key,
+ "identity": {"email": customer_email},
+ }
+ pr = get_privacy_request_results(
+ db,
+ erasure_policy,
+ run_privacy_request_task,
+ data,
+ task_timeout=PRIVACY_REQUEST_TASK_TIMEOUT_EXTERNAL,
+ )
+ pr.delete(db=db)
+ deserializer = TypeDeserializer()
+ customer = dynamodb_client.get_item(
+ TableName="customer",
+ Key={"id": {"S": customer_id}},
+ )
+ customer_identifier = dynamodb_client.get_item(
+ TableName="customer_identifier",
+ Key={"email": {"S": customer_email}},
+ )
+ login = dynamodb_client.get_item(
+ TableName="login",
+ Key={
+ "customer_id": {"S": customer_name},
+ "login_date": {"S": "2023-01-01"},
+ },
+ )
+ assert deserializer.deserialize(customer["Item"]["name"]) == None
+ assert deserializer.deserialize(customer_identifier["Item"]["name"]) == None
+ assert deserializer.deserialize(login["Item"]["name"]) == None