Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MongoDB: Improve error handling wrt. bulk operations vs. usability #262

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
or from filesystem directory
- MongoDB: Unlock processing JSON files from HTTP resource, using `https+bson://`
- MongoDB: Optionally filter server collection using MongoDB query expression
- MongoDB: Improve error handling wrt. bulk operations vs. usability

## 2024/09/10 v0.0.22
- MongoDB: Rename columns with leading underscores to use double leading underscores
Expand Down
180 changes: 180 additions & 0 deletions cratedb_toolkit/io/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration?
import typing as t
from functools import cached_property

import sqlalchemy as sa
from attr import Factory
from attrs import define
from commons_codec.model import SQLOperation
from pympler.asizeof import asizeof
from sqlalchemy.exc import ProgrammingError
from tqdm import tqdm

from cratedb_toolkit.util.database import logger


class BulkResultItem(t.TypedDict):
"""
Define the shape of a CrateDB bulk request response item.
"""

rowcount: int


@define
class BulkResponse:
"""
Manage CrateDB bulk request responses.
Accepts a list of bulk arguments (parameter list) and a list of bulk response items.

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations

TODO: Think about refactoring this to `sqlalchemy_cratedb.support`.
"""

parameters: t.Union[t.List[t.Dict[str, t.Any]], None]
cratedb_bulk_result: t.Union[t.List[BulkResultItem], None]

@cached_property
def failed_records(self) -> t.List[t.Dict[str, t.Any]]:
"""
Compute list of failed records.

CrateDB signals failed insert using `rowcount=-2`.

https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling
"""
if self.parameters is None or self.cratedb_bulk_result is None:
return []
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.parameters, self.cratedb_bulk_result):
if status["rowcount"] == -2:
errors.append(record)
return errors

@cached_property
def parameter_count(self) -> int:
"""
Compute bulk size / length of parameter list.
"""
if not self.parameters:
return 0

Check warning on line 61 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L61

Added line #L61 was not covered by tests
return len(self.parameters)

@cached_property
def success_count(self) -> int:
"""
Compute number of succeeding records within a batch.
"""
return self.parameter_count - self.failed_count

@cached_property
def failed_count(self) -> int:
"""
Compute number of failed records within a batch.
"""
return len(self.failed_records)


@define
class BulkMetrics:
"""
Manage a few details for a `BulkProcessor` task.
"""

count_success_total: int = 0
count_error_total: int = 0
bytes_write_total: int = 0
bytes_error_total: int = 0
rate_current: int = 0
rate_max: int = 0


@define
class BulkProcessor:
"""
Generic driver to run a bulk operation against CrateDB, which can fall back to record-by-record operation.

It aims to provide a combination of both performance/efficiency by using bulk operations,
and also good usability and on-the-spot error message for records that fail to insert.

Background: This is a canonical client-side API wrapper for CrateDB's bulk operations HTTP endpoint.
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations
"""

connection: sa.Connection
data: t.Iterable[t.List[t.Dict[str, t.Any]]]
batch_to_operation: t.Callable[[t.List[t.Dict[str, t.Any]]], SQLOperation]
progress_bar: t.Union[tqdm, None] = None
on_error: t.Literal["ignore", "raise"] = "ignore"
debug: bool = False

_metrics: BulkMetrics = Factory(BulkMetrics)

@cached_property
def log_level(self):
if self.debug:
return logger.exception

Check warning on line 117 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L116-L117

Added lines #L116 - L117 were not covered by tests
else:
return logger.warning

Check warning on line 119 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L119

Added line #L119 was not covered by tests

def start(self) -> BulkMetrics:
# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
for batch in self.data:
current_batch_size = len(batch)

self.progress_bar and self.progress_bar.set_description("ACQUIRE")

try:
operation = self.batch_to_operation(batch)
except Exception as ex:
self.log_level(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
continue

Check warning on line 134 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L130-L134

Added lines #L130 - L134 were not covered by tests

self._metrics.bytes_write_total += asizeof(operation)
statement = sa.text(operation.statement)

# Submit operation to CrateDB, using `bulk_args`.
self.progress_bar and self.progress_bar.set_description("SUBMIT ")
try:
cursor = self.connection.execute(statement=statement, parameters=operation.parameters)
self.connection.commit()
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
self._metrics.count_success_total += bulk_response.success_count
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count)
Comment on lines +139 to +149
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the spot where the bulk operations response is being consumed and evaluated, by correlating the ingress parameters (batch of records) against the bulk ops response's status items {"rowcount": 1} vs. {"rowcount": -2}, in order to discover the effective records that failed.


# When a batch is of size one, an exception is raised.
# Just signal the same condition as if a batch would have failed.
except ProgrammingError:
failed_records = [operation.parameters]
count_success_local = 0

Check warning on line 155 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L153-L155

Added lines #L153 - L155 were not covered by tests

# When bulk operations fail, try inserting failed records record-by-record,
# in order to relay proper error messages to the user.
if failed_records:
logger.warning(
f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. "
f"Falling back to per-record operations."
)
for record in failed_records:
try:
self.connection.execute(statement=statement, parameters=record)
self.connection.commit()
self._metrics.count_success_total += 1

Check warning on line 168 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L167-L168

Added lines #L167 - L168 were not covered by tests
except Exception as ex:
logger.warning(f"Operation failed: {ex}")
logger.debug(f"Failing record: {record}")
self._metrics.count_error_total += 1
self._metrics.bytes_error_total += asizeof(record)
if self.on_error == "raise":
raise

Check warning on line 175 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L175

Added line #L175 was not covered by tests
self.progress_bar and self.progress_bar.update(n=1)

self.progress_bar and self.progress_bar.close()

return self._metrics
4 changes: 2 additions & 2 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@
for task in tasks:
try:
outcome_task = task.start()
except (Exception, PanicException):
logger.exception("Task failed")
except (Exception, PanicException) as ex:
logger.error(f"Task failed: {ex}")

Check warning on line 172 in cratedb_toolkit/io/mongodb/api.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/mongodb/api.py#L171-L172

Added lines #L171 - L172 were not covered by tests
outcome_task = False
outcome = outcome and outcome_task

Expand Down
67 changes: 24 additions & 43 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
from tqdm.contrib.logging import logging_redirect_tqdm
from zyp.model.collection import CollectionAddress

from cratedb_toolkit.io.core import BulkProcessor
from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.export import CrateDBConverter
from cratedb_toolkit.io.mongodb.model import DocumentDict
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -44,9 +46,7 @@ def to_sql(self, data: t.Union[DocumentDict, t.List[DocumentDict]]) -> SQLOperat
"""
Produce CrateDB SQL INSERT batch operation from multiple MongoDB documents.
"""
if isinstance(data, Cursor):
data = list(data)
if not isinstance(data, list):
if not isinstance(data, Cursor) and not isinstance(data, list):
data = [data]

# Define SQL INSERT statement.
Expand All @@ -72,10 +72,12 @@ def __init__(
mongodb_url: t.Union[str, URL],
cratedb_url: t.Union[str, URL],
tm: t.Union[TransformationManager, None],
on_error: t.Literal["ignore", "raise"] = "raise",
on_error: t.Literal["ignore", "raise"] = "ignore",
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Expand Down Expand Up @@ -114,51 +116,30 @@ def start(self):
logger.info(f"Starting MongoDBFullLoad. source={self.mongodb_uri}, target={self.cratedb_uri}")
records_in = self.mongodb_adapter.record_count()
logger.info(f"Source: MongoDB {self.mongodb_adapter.address} count={records_in}")
logger_on_error = logger.warning
if self.debug:
logger_on_error = logger.exception
with self.cratedb_adapter.engine.connect() as connection, logging_redirect_tqdm():
if not self.cratedb_adapter.table_exists(self.cratedb_table):
connection.execute(sa.text(self.translator.sql_ddl))
connection.commit()
records_target = self.cratedb_adapter.count_records(self.cratedb_table)
logger.info(f"Target: CrateDB table={self.cratedb_table} count={records_target}")
progress_bar = tqdm(total=records_in)
records_out: int = 0

# Acquire batches of documents, convert to SQL operations, and submit to CrateDB.
for documents in self.mongodb_adapter.query():
progress_bar.set_description("ACQUIRE")

try:
operation = self.translator.to_sql(documents)
except Exception as ex:
logger_on_error(f"Computing query failed: {ex}")
if self.on_error == "raise":
raise
continue

# Submit operation to CrateDB.
progress_bar.set_description("SUBMIT ")
try:
result = connection.execute(sa.text(operation.statement), operation.parameters)
result_size = result.rowcount
if result_size < 0:
raise IOError("Unable to insert one or more records")
records_out += result_size
progress_bar.update(n=result_size)
except Exception as ex:
logger_on_error(
f"Executing operation failed: {ex}\n"
f"Statement: {operation.statement}\nParameters: {str(operation.parameters)[:500]} [...]"
)
if self.on_error == "raise":
raise
continue

progress_bar.close()
connection.commit()
logger.info(f"Number of records written: {records_out}")
if records_out == 0:

processor = BulkProcessor(
connection=connection,
data=self.mongodb_adapter.query(),
batch_to_operation=self.translator.to_sql,
progress_bar=progress_bar,
on_error=self.on_error,
debug=self.debug,
)
metrics = processor.start()
logger.info(f"Bulk processor metrics: {metrics}")

logger.info(
"Number of records written: "
f"success={metrics.count_success_total}, error={metrics.count_error_total}"
)
if metrics.count_success_total == 0:
logger.warning("No data has been copied")

return True
Empty file.
19 changes: 19 additions & 0 deletions cratedb_toolkit/sqlalchemy/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from sqlalchemy_cratedb import dialect


def do_executemany(self, cursor, statement, parameters, context=None):
"""
Improved version of `do_executemany` that stores its response into the request context instance.

TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`.
"""
result = cursor.executemany(statement, parameters)
if context is not None:
context.last_executemany_result = result


def monkeypatch_executemany():
"""
Enable improved version of `do_executemany`.
"""
dialect.do_executemany = do_executemany
Comment on lines +1 to +19
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That code needs to go into the SQLAlchemy dialect package on a later iteration. It sets the stage to relay CrateDB's bulk operations response through SQLAlchemy's ExecutionContext to downstream consumers.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ dependencies = [
'importlib-metadata; python_version < "3.8"',
'importlib-resources; python_version < "3.9"',
"polars<1.7",
"pympler<1.2",
"python-dateutil<3",
"python-dotenv<2",
"python-slugify<9",
Expand Down
3 changes: 3 additions & 0 deletions tests/io/mongodb/mixed.ndjson
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"_id":1,"name":"Foo","date":{"$date":"2011-01-14T08:00:00Z"}}
{"_id":2,"name":"Bar","date":{"$date":"2011-01-15T08:00:00Z"},"nested_array":[[1,2]]}
{"_id":3,"name":"Baz","date":{"$date":"2011-01-16T08:00:00Z"}}
22 changes: 21 additions & 1 deletion tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def test_mongodb_copy_filesystem_folder_relative(caplog, cratedb, mongodb):
assert cratedb.database.count_records("testdrive.books-relaxed") == 4


def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
def test_mongodb_copy_filesystem_json_relaxed_success(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
"""
Expand Down Expand Up @@ -187,6 +187,26 @@ def test_mongodb_copy_filesystem_json_relaxed(caplog, cratedb):
assert timestamp_type == "bigint"


def test_mongodb_copy_filesystem_json_relaxed_warning(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, which should omit a warning on an invalid record.
"""

# Define source and target URLs.
json_resource = "file+bson:./tests/io/mongodb/mixed.ndjson"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
mongodb_copy(json_resource, cratedb_url)

# Verify metadata in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 2

assert "Dynamic nested arrays are not supported" in caplog.text


def test_mongodb_copy_filesystem_json_canonical(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer.
Expand Down