From c0d08a7fe29bb688fa0a1075a5c6544b23be04eb Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Fri, 13 Sep 2024 23:48:56 +0200 Subject: [PATCH] DynamoDB Full: Improve error handling wrt. bulk operations vs. usability Use `BulkProcessor` in the same spirit like it has been conceived for MongoDB. --- CHANGES.md | 1 + cratedb_toolkit/io/dynamodb/copy.py | 64 ++++++++++++++++++----------- tests/io/dynamodb/test_copy.py | 57 +++++++++++++++++++++---- 3 files changed, 88 insertions(+), 34 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index c58595f8..247fd2e1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -10,6 +10,7 @@ - DynamoDB CDC: Add `ctk load table` interface for processing CDC events - DynamoDB CDC: Accept a few more options for the Kinesis Stream: batch-size, create, create-shards, start, seqno, idle-sleep, buffer-time +- DynamoDB Full: Improve error handling wrt. bulk operations vs. usability ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/io/dynamodb/copy.py b/cratedb_toolkit/io/dynamodb/copy.py index c059faa5..4f5a59d4 100644 --- a/cratedb_toolkit/io/dynamodb/copy.py +++ b/cratedb_toolkit/io/dynamodb/copy.py @@ -1,13 +1,16 @@ # ruff: noqa: S608 import logging +import typing as t import sqlalchemy as sa from commons_codec.transform.dynamodb import DynamoDBFullLoadTranslator from tqdm import tqdm from yarl import URL +from cratedb_toolkit.io.core import BulkProcessor from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany from cratedb_toolkit.util import DatabaseAdapter from cratedb_toolkit.util.data import asbool @@ -23,9 +26,12 @@ def __init__( self, dynamodb_url: str, cratedb_url: str, + on_error: t.Literal["ignore", "raise"] = "ignore", progress: bool = False, debug: bool = True, ): + monkeypatch_executemany() + cratedb_address = DatabaseAddress.from_string(cratedb_url) cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode() cratedb_table = cratedb_table_address.fullname @@ -37,6 +43,7 @@ def __init__( self.cratedb_table = self.cratedb_adapter.quote_relation_name(cratedb_table) self.translator = DynamoDBFullLoadTranslator(table_name=self.cratedb_table) + self.on_error = on_error self.progress = progress self.debug = debug @@ -49,9 +56,7 @@ def start(self): """ records_in = self.dynamodb_adapter.count_records(self.dynamodb_table) logger.info(f"Source: DynamoDB table={self.dynamodb_table} count={records_in}") - logger_on_error = logger.warning - if self.debug: - logger_on_error = logger.exception + with self.cratedb_adapter.engine.connect() as connection: if not self.cratedb_adapter.table_exists(self.cratedb_table): connection.execute(sa.text(self.translator.sql_ddl)) @@ -59,26 +64,35 @@ def start(self): records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") progress_bar = tqdm(total=records_in) - records_out = 0 - for result in self.dynamodb_adapter.scan( - table_name=self.dynamodb_table, - consistent_read=self.consistent_read, - batch_size=self.batch_size, - ): - result_size = len(result["Items"]) - try: - operation = self.translator.to_sql(result["Items"]) - except Exception as ex: - logger_on_error(f"Transforming query failed: {ex}") - continue - try: - connection.execute(sa.text(operation.statement), operation.parameters) - records_out += result_size - progress_bar.update(n=result_size) - except Exception as ex: - logger_on_error(f"Executing query failed: {ex}") - progress_bar.close() - connection.commit() - logger.info(f"Number of records written: {records_out}") - if records_out == 0: + + processor = BulkProcessor( + connection=connection, + data=self.fetch(), + batch_to_operation=self.translator.to_sql, + progress_bar=progress_bar, + on_error=self.on_error, + debug=self.debug, + ) + metrics = processor.start() + logger.info(f"Bulk processor metrics: {metrics}") + + logger.info( + "Number of records written: " + f"success={metrics.count_success_total}, error={metrics.count_error_total}" + ) + if metrics.count_success_total == 0: logger.warning("No data has been copied") + + return True + + def fetch(self) -> t.Generator[t.List[t.Dict[str, t.Any]], None, None]: + """ + Fetch data from DynamoDB. Generate batches of items. + """ + data = self.dynamodb_adapter.scan( + table_name=self.dynamodb_table, + consistent_read=self.consistent_read, + batch_size=self.batch_size, + ) + for result in data: + yield result["Items"] diff --git a/tests/io/dynamodb/test_copy.py b/tests/io/dynamodb/test_copy.py index 609f29b0..984cfead 100644 --- a/tests/io/dynamodb/test_copy.py +++ b/tests/io/dynamodb/test_copy.py @@ -5,22 +5,24 @@ pytestmark = pytest.mark.dynamodb -RECORD = { - "Id": {"N": "101"}, -} - - -def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager): +def test_dynamodb_copy_basic_success(caplog, cratedb, dynamodb, dynamodb_test_manager): """ - Verify `DynamoDBFullLoad` works as expected. + Verify a basic `DynamoDBFullLoad` works as expected. """ + data_in = { + "Id": {"N": "101"}, + } + data_out = { + "Id": 101.0, + } + # Define source and target URLs. dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" # Populate source database with data. - dynamodb_test_manager.load_records(table_name="demo", records=[RECORD]) + dynamodb_test_manager.load_records(table_name="demo", records=[data_in]) # Run transfer command. table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url) @@ -32,4 +34,41 @@ def test_dynamodb_copy_success(caplog, cratedb, dynamodb, dynamodb_test_manager) assert cratedb.database.count_records("testdrive.demo") == 1 results = cratedb.database.run_sql("SELECT * FROM testdrive.demo;", records=True) # noqa: S608 - assert results[0]["data"] == {"Id": 101.0} + assert results[0]["data"] == data_out + + +def test_dynamodb_copy_basic_warning(caplog, cratedb, dynamodb, dynamodb_test_manager): + """ + Verify a basic `DynamoDBFullLoad` works as expected, this time omitting a warning on an invalid record. + """ + + data_in = [ + {"Id": {"N": "1"}, "name": {"S": "Foo"}}, + {"Id": {"N": "2"}, "name": {"S": "Bar"}, "nested_array": {"L": [{"L": [{"N": "1"}, {"N": "2"}]}]}}, + {"Id": {"N": "3"}, "name": {"S": "Baz"}}, + ] + data_out = [ + {"data": {"Id": 1, "name": "Foo"}, "aux": {}}, + {"data": {"Id": 3, "name": "Baz"}, "aux": {}}, + ] + + # Define source and target URLs. + dynamodb_url = f"{dynamodb.get_connection_url_dynamodb()}/demo?region=us-east-1" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Populate source database with data. + dynamodb_test_manager.load_records(table_name="demo", records=data_in) + + # Run transfer command. + table_loader = DynamoDBFullLoad(dynamodb_url=dynamodb_url, cratedb_url=cratedb_url) + table_loader.start() + + # Verify data in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 2 + + results = cratedb.database.run_sql("SELECT * FROM testdrive.demo ORDER BY data['Id'];", records=True) # noqa: S608 + assert results == data_out + + assert "Dynamic nested arrays are not supported" in caplog.text