Skip to content

Commit

Permalink
DynamoDB Full: Improve error handling wrt. bulk operations vs. usability
Browse files Browse the repository at this point in the history
Use `BulkProcessor` in the same spirit like it has been conceived for
MongoDB.
  • Loading branch information
amotl committed Sep 16, 2024
1 parent 999bbb1 commit c0d08a7
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- DynamoDB CDC: Add `ctk load table` interface for processing CDC events
- DynamoDB CDC: Accept a few more options for the Kinesis Stream:
batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time
- DynamoDB Full: Improve error handling wrt. bulk operations vs. usability

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
64 changes: 39 additions & 25 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
# ruff: noqa: S608
import logging
import typing as t

import sqlalchemy as sa
from commons_codec.transform.dynamodb import DynamoDBFullLoadTranslator
from tqdm import tqdm
from yarl import URL

from cratedb_toolkit.io.core import BulkProcessor
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.data import asbool

Expand All @@ -23,9 +26,12 @@ def __init__(
self,
dynamodb_url: str,
cratedb_url: str,
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname
Expand All @@ -37,6 +43,7 @@ def __init__(
self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table)
self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table)

self.on_error = on_error
self.progress = progress
self.debug = debug

Expand All @@ -49,36 +56,43 @@ def start(self):
"""
records_in = self.dynamodb_adapter.count_records(self.dynamodb_table)
logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception

with self.cratedb_adapter.engine.connect() as connection:
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out = 0
for result in self.dynamodb_adapter.scan(
table_name=self.dynamodb_table,
consistent_read=self.consistent_read,
batch_size=self.batch_size,
):
result_size = len(result["Items"])
try:
operation = self.translator.to_sql(result["Items"])
except Exception as ex:
logger_on_error(f"Transforming query failed: {ex}")
continue
try:
connection.execute(sa.text(operation.statement), operation.parameters)
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(f"Executing query failed: {ex}")
progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out == 0:

processor = BulkProcessor(
connection=connection,
data=self.fetch(),
batch_to_operation=self.translator.to_sql,
progress_bar=progress_bar,
on_error=self.on_error,
debug=self.debug,
)
metrics = processor.start()
logger.info(f"Bulk processor metrics: {metrics}")

logger.info(
"Number of records written: "
f"success={metrics.count_success_total}, error={metrics.count_error_total}"
)
if metrics.count_success_total == 0:
logger.warning("No data has been copied")

return True

def fetch(self) -> t.Generator[t.List[t.Dict[str, t.Any]], None, None]:
"""
Fetch data from DynamoDB. Generate batches of items.
"""
data = self.dynamodb_adapter.scan(
table_name=self.dynamodb_table,
consistent_read=self.consistent_read,
batch_size=self.batch_size,
)
for result in data:
yield result["Items"]
57 changes: 48 additions & 9 deletions tests/io/dynamodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,24 @@
pytestmark = pytest.mark.dynamodb


RECORD = {
"Id": {"N": "101"},
}


def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager):
def test_dynamodb_copy_basic_success(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
Verify `DynamoDBFullLoad` works as expected.
Verify a basic `DynamoDBFullLoad` works as expected.
"""

data_in = {
"Id": {"N": "101"},
}
data_out = {
"Id": 101.0,
}

# Define source and target URLs.
dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=[RECORD])
dynamodb_test_manager.load_records(table_name="demo", records=[data_in])

# Run transfer command.
table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url)
Expand All @@ -32,4 +34,41 @@ def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager)
assert cratedb.database.count_records("testdrive.demo") == 1

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608
assert results[0]["data"] == {"Id": 101.0}
assert results[0]["data"] == data_out


def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_manager):
"""
Verify a basic `DynamoDBFullLoad` works as expected, this time omitting a warning on an invalid record.
"""

data_in = [
{"Id": {"N": "1"}, "name": {"S": "Foo"}},
{"Id": {"N": "2"}, "name": {"S": "Bar"}, "nested_array": {"L": [{"L": [{"N": "1"}, {"N": "2"}]}]}},
{"Id": {"N": "3"}, "name": {"S": "Baz"}},
]
data_out = [
{"data": {"Id": 1, "name": "Foo"}, "aux": {}},
{"data": {"Id": 3, "name": "Baz"}, "aux": {}},
]

# Define source and target URLs.
dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Populate source database with data.
dynamodb_test_manager.load_records(table_name="demo", records=data_in)

# Run transfer command.
table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url)
table_loader.start()

# Verify data in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 2

results = cratedb.database.run_sql("SELECT * FROM testdrive.demo ORDER BY data['Id'];", records=True) # noqa: S608
assert results == data_out

assert "Dynamic nested arrays are not supported" in caplog.text

0 comments on commit c0d08a7

Please sign in to comment.