diff --git a/CHANGES.md b/CHANGES.md index e1b61217..f16e011d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -6,6 +6,7 @@ or from filesystem directory - MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://` - MongoDB: Optionally filter server collection using MongoDB query expression +- MongoDB: Improve error handling wrt. bulk operations vs. usability ## 2024/09/10 v0.0.22 - MongoDB: Rename columns with leading underscores to use double leading underscores diff --git a/cratedb_toolkit/io/core.py b/cratedb_toolkit/io/core.py new file mode 100644 index 00000000..d793b899 --- /dev/null +++ b/cratedb_toolkit/io/core.py @@ -0,0 +1,180 @@ +# TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration? +import typing as t +from functools import cached_property + +import sqlalchemy as sa +from attr import Factory +from attrs import define +from commons_codec.model import SQLOperation +from pympler.asizeof import asizeof +from sqlalchemy.exc import ProgrammingError +from tqdm import tqdm + +from cratedb_toolkit.util.database import logger + + +class BulkResultItem(t.TypedDict): + """ + Define the shape of a CrateDB bulk request response item. + """ + + rowcount: int + + +@define +class BulkResponse: + """ + Manage CrateDB bulk request responses. + Accepts a list of bulk arguments (parameter list) and a list of bulk response items. + + https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations + + TODO: Think about refactoring this to `sqlalchemy_cratedb.support`. + """ + + parameters: t.Union[t.List[t.Dict[str, t.Any]], None] + cratedb_bulk_result: t.Union[t.List[BulkResultItem], None] + + @cached_property + def failed_records(self) -> t.List[t.Dict[str, t.Any]]: + """ + Compute list of failed records. + + CrateDB signals failed insert using `rowcount=-2`. + + https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling + """ + if self.parameters is None or self.cratedb_bulk_result is None: + return [] + errors: t.List[t.Dict[str, t.Any]] = [] + for record, status in zip(self.parameters, self.cratedb_bulk_result): + if status["rowcount"] == -2: + errors.append(record) + return errors + + @cached_property + def parameter_count(self) -> int: + """ + Compute bulk size / length of parameter list. + """ + if not self.parameters: + return 0 + return len(self.parameters) + + @cached_property + def success_count(self) -> int: + """ + Compute number of succeeding records within a batch. + """ + return self.parameter_count - self.failed_count + + @cached_property + def failed_count(self) -> int: + """ + Compute number of failed records within a batch. + """ + return len(self.failed_records) + + +@define +class BulkMetrics: + """ + Manage a few details for a `BulkProcessor` task. + """ + + count_success_total: int = 0 + count_error_total: int = 0 + bytes_write_total: int = 0 + bytes_error_total: int = 0 + rate_current: int = 0 + rate_max: int = 0 + + +@define +class BulkProcessor: + """ + Generic driver to run a bulk operation against CrateDB, which can fall back to record-by-record operation. + + It aims to provide a combination of both performance/efficiency by using bulk operations, + and also good usability and on-the-spot error message for records that fail to insert. + + Background: This is a canonical client-side API wrapper for CrateDB's bulk operations HTTP endpoint. + https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations + """ + + connection: sa.Connection + data: t.Iterable[t.List[t.Dict[str, t.Any]]] + batch_to_operation: t.Callable[[t.List[t.Dict[str, t.Any]]], SQLOperation] + progress_bar: t.Union[tqdm, None] = None + on_error: t.Literal["ignore", "raise"] = "ignore" + debug: bool = False + + _metrics: BulkMetrics = Factory(BulkMetrics) + + @cached_property + def log_level(self): + if self.debug: + return logger.exception + else: + return logger.warning + + def start(self) -> BulkMetrics: + # Acquire batches of documents, convert to SQL operations, and submit to CrateDB. + for batch in self.data: + current_batch_size = len(batch) + + self.progress_bar and self.progress_bar.set_description("ACQUIRE") + + try: + operation = self.batch_to_operation(batch) + except Exception as ex: + self.log_level(f"Computing query failed: {ex}") + if self.on_error == "raise": + raise + continue + + self._metrics.bytes_write_total += asizeof(operation) + statement = sa.text(operation.statement) + + # Submit operation to CrateDB, using `bulk_args`. + self.progress_bar and self.progress_bar.set_description("SUBMIT ") + try: + cursor = self.connection.execute(statement=statement, parameters=operation.parameters) + self.connection.commit() + cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None) + bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result) + failed_records = bulk_response.failed_records + count_success_local = bulk_response.success_count + self._metrics.count_success_total += bulk_response.success_count + self.progress_bar and self.progress_bar.update(n=bulk_response.success_count) + + # When a batch is of size one, an exception is raised. + # Just signal the same condition as if a batch would have failed. + except ProgrammingError: + failed_records = [operation.parameters] + count_success_local = 0 + + # When bulk operations fail, try inserting failed records record-by-record, + # in order to relay proper error messages to the user. + if failed_records: + logger.warning( + f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. " + f"Falling back to per-record operations." + ) + for record in failed_records: + try: + self.connection.execute(statement=statement, parameters=record) + self.connection.commit() + self._metrics.count_success_total += 1 + except Exception as ex: + logger.warning(f"Operation failed: {ex}") + logger.debug(f"Failing record: {record}") + self._metrics.count_error_total += 1 + self._metrics.bytes_error_total += asizeof(record) + if self.on_error == "raise": + raise + self.progress_bar and self.progress_bar.update(n=1) + + self.progress_bar and self.progress_bar.close() + + return self._metrics diff --git a/cratedb_toolkit/io/mongodb/api.py b/cratedb_toolkit/io/mongodb/api.py index 83a9f30f..12c05cad 100644 --- a/cratedb_toolkit/io/mongodb/api.py +++ b/cratedb_toolkit/io/mongodb/api.py @@ -168,8 +168,8 @@ def mongodb_copy( for task in tasks: try: outcome_task = task.start() - except (Exception, PanicException): - logger.exception("Task failed") + except (Exception, PanicException) as ex: + logger.error(f"Task failed: {ex}") outcome_task = False outcome = outcome and outcome_task diff --git a/cratedb_toolkit/io/mongodb/copy.py b/cratedb_toolkit/io/mongodb/copy.py index 893a4282..3c312b78 100644 --- a/cratedb_toolkit/io/mongodb/copy.py +++ b/cratedb_toolkit/io/mongodb/copy.py @@ -11,11 +11,13 @@ from tqdm.contrib.logging import logging_redirect_tqdm from zyp.model.collection import CollectionAddress +from cratedb_toolkit.io.core import BulkProcessor from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory from cratedb_toolkit.io.mongodb.export import CrateDBConverter from cratedb_toolkit.io.mongodb.model import DocumentDict from cratedb_toolkit.io.mongodb.transform import TransformationManager from cratedb_toolkit.model import DatabaseAddress +from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany from cratedb_toolkit.util import DatabaseAdapter logger = logging.getLogger(__name__) @@ -44,9 +46,7 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat """ Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents. """ - if isinstance(data, Cursor): - data = list(data) - if not isinstance(data, list): + if not isinstance(data, Cursor) and not isinstance(data, list): data = [data] # Define SQL INSERT statement. @@ -72,10 +72,12 @@ def __init__( mongodb_url: t.Union[str, URL], cratedb_url: t.Union[str, URL], tm: t.Union[TransformationManager, None], - on_error: t.Literal["ignore", "raise"] = "raise", + on_error: t.Literal["ignore", "raise"] = "ignore", progress: bool = False, debug: bool = True, ): + monkeypatch_executemany() + self.mongodb_uri = URL(mongodb_url) self.cratedb_uri = URL(cratedb_url) @@ -114,9 +116,6 @@ def start(self): logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}") records_in = self.mongodb_adapter.record_count() logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}") - logger_on_error = logger.warning - if self.debug: - logger_on_error = logger.exception with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm(): if not self.cratedb_adapter.table_exists(self.cratedb_table): connection.execute(sa.text(self.translator.sql_ddl)) @@ -124,41 +123,23 @@ def start(self): records_target = self.cratedb_adapter.count_records(self.cratedb_table) logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}") progress_bar = tqdm(total=records_in) - records_out: int = 0 - - # Acquire batches of documents, convert to SQL operations, and submit to CrateDB. - for documents in self.mongodb_adapter.query(): - progress_bar.set_description("ACQUIRE") - - try: - operation = self.translator.to_sql(documents) - except Exception as ex: - logger_on_error(f"Computing query failed: {ex}") - if self.on_error == "raise": - raise - continue - - # Submit operation to CrateDB. - progress_bar.set_description("SUBMIT ") - try: - result = connection.execute(sa.text(operation.statement), operation.parameters) - result_size = result.rowcount - if result_size < 0: - raise IOError("Unable to insert one or more records") - records_out += result_size - progress_bar.update(n=result_size) - except Exception as ex: - logger_on_error( - f"Executing operation failed: {ex}\n" - f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]" - ) - if self.on_error == "raise": - raise - continue - - progress_bar.close() - connection.commit() - logger.info(f"Number of records written: {records_out}") - if records_out == 0: + + processor = BulkProcessor( + connection=connection, + data=self.mongodb_adapter.query(), + batch_to_operation=self.translator.to_sql, + progress_bar=progress_bar, + on_error=self.on_error, + debug=self.debug, + ) + metrics = processor.start() + logger.info(f"Bulk processor metrics: {metrics}") + + logger.info( + "Number of records written: " + f"success={metrics.count_success_total}, error={metrics.count_error_total}" + ) + if metrics.count_success_total == 0: logger.warning("No data has been copied") + return True diff --git a/cratedb_toolkit/sqlalchemy/__init__.py b/cratedb_toolkit/sqlalchemy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cratedb_toolkit/sqlalchemy/patch.py b/cratedb_toolkit/sqlalchemy/patch.py new file mode 100644 index 00000000..9537f5bc --- /dev/null +++ b/cratedb_toolkit/sqlalchemy/patch.py @@ -0,0 +1,19 @@ +from sqlalchemy_cratedb import dialect + + +def do_executemany(self, cursor, statement, parameters, context=None): + """ + Improved version of `do_executemany` that stores its response into the request context instance. + + TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`. + """ + result = cursor.executemany(statement, parameters) + if context is not None: + context.last_executemany_result = result + + +def monkeypatch_executemany(): + """ + Enable improved version of `do_executemany`. + """ + dialect.do_executemany = do_executemany diff --git a/pyproject.toml b/pyproject.toml index b455e439..135396e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -95,6 +95,7 @@ dependencies = [ 'importlib-metadata; python_version < "3.8"', 'importlib-resources; python_version < "3.9"', "polars<1.7", + "pympler<1.2", "python-dateutil<3", "python-dotenv<2", "python-slugify<9", diff --git a/tests/io/mongodb/mixed.ndjson b/tests/io/mongodb/mixed.ndjson new file mode 100644 index 00000000..7cd53377 --- /dev/null +++ b/tests/io/mongodb/mixed.ndjson @@ -0,0 +1,3 @@ +{"_id":1,"name":"Foo","date":{"$date":"2011-01-14T08:00:00Z"}} +{"_id":2,"name":"Bar","date":{"$date":"2011-01-15T08:00:00Z"},"nested_array":[[1,2]]} +{"_id":3,"name":"Baz","date":{"$date":"2011-01-16T08:00:00Z"}} diff --git a/tests/io/mongodb/test_copy.py b/tests/io/mongodb/test_copy.py index 36ac8c1c..fb3932b5 100644 --- a/tests/io/mongodb/test_copy.py +++ b/tests/io/mongodb/test_copy.py @@ -154,7 +154,7 @@ def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb): assert cratedb.database.count_records("testdrive.books-relaxed") == 4 -def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb): +def test_mongodb_copy_filesystem_json_relaxed_success(caplog, cratedb): """ Verify MongoDB Extended JSON -> CrateDB data transfer. """ @@ -187,6 +187,26 @@ def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb): assert timestamp_type == "bigint" +def test_mongodb_copy_filesystem_json_relaxed_warning(caplog, cratedb): + """ + Verify MongoDB Extended JSON -> CrateDB data transfer, which should omit a warning on an invalid record. + """ + + # Define source and target URLs. + json_resource = "file+bson:./tests/io/mongodb/mixed.ndjson" + cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" + + # Run transfer command. + mongodb_copy(json_resource, cratedb_url) + + # Verify metadata in target database. + assert cratedb.database.table_exists("testdrive.demo") is True + assert cratedb.database.refresh_table("testdrive.demo") is True + assert cratedb.database.count_records("testdrive.demo") == 2 + + assert "Dynamic nested arrays are not supported" in caplog.text + + def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb): """ Verify MongoDB Extended JSON -> CrateDB data transfer.