-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
MongoDB: Improve error handling wrt. bulk operations vs. usability
In order to have both, efficient bulk insert operations, and on-the-spot error messages on records that fail to insert, let's introduce a two-stage approach: First, try to insert a batch. When it fails, determine invalid records and insert them one-by-one, in order to relay corresponding error messages to the user.
- Loading branch information
Showing
9 changed files
with
251 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
# TODO: Maybe refactor to `sqlalchemy-cratedb` or `commons-codec` on another iteration? | ||
import typing as t | ||
from functools import cached_property | ||
|
||
import sqlalchemy as sa | ||
from attr import Factory | ||
from attrs import define | ||
from commons_codec.model import SQLOperation | ||
from pympler.asizeof import asizeof | ||
from sqlalchemy.exc import ProgrammingError | ||
from tqdm import tqdm | ||
|
||
from cratedb_toolkit.util.database import logger | ||
|
||
|
||
class BulkResultItem(t.TypedDict): | ||
""" | ||
Define the shape of a CrateDB bulk request response item. | ||
""" | ||
|
||
rowcount: int | ||
|
||
|
||
@define | ||
class BulkResponse: | ||
""" | ||
Manage CrateDB bulk request responses. | ||
Accepts a list of bulk arguments (parameter list) and a list of bulk response items. | ||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations | ||
TODO: Think about refactoring this to `sqlalchemy_cratedb.support`. | ||
""" | ||
|
||
parameters: t.Union[t.List[t.Dict[str, t.Any]], None] | ||
cratedb_bulk_result: t.Union[t.List[BulkResultItem], None] | ||
|
||
@cached_property | ||
def failed_records(self) -> t.List[t.Dict[str, t.Any]]: | ||
""" | ||
Compute list of failed records. | ||
CrateDB signals failed insert using `rowcount=-2`. | ||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#error-handling | ||
""" | ||
if self.parameters is None or self.cratedb_bulk_result is None: | ||
return [] | ||
errors: t.List[t.Dict[str, t.Any]] = [] | ||
for record, status in zip(self.parameters, self.cratedb_bulk_result): | ||
if status["rowcount"] == -2: | ||
errors.append(record) | ||
return errors | ||
|
||
@cached_property | ||
def parameter_count(self) -> int: | ||
""" | ||
Compute bulk size / length of parameter list. | ||
""" | ||
if not self.parameters: | ||
return 0 | ||
return len(self.parameters) | ||
|
||
@cached_property | ||
def success_count(self) -> int: | ||
""" | ||
Compute number of succeeding records within a batch. | ||
""" | ||
return self.parameter_count - self.failed_count | ||
|
||
@cached_property | ||
def failed_count(self) -> int: | ||
""" | ||
Compute number of failed records within a batch. | ||
""" | ||
return len(self.failed_records) | ||
|
||
|
||
@define | ||
class BulkMetrics: | ||
""" | ||
Manage a few details for a `BulkProcessor` task. | ||
""" | ||
|
||
count_success_total: int = 0 | ||
count_error_total: int = 0 | ||
bytes_write_total: int = 0 | ||
bytes_error_total: int = 0 | ||
rate_current: int = 0 | ||
rate_max: int = 0 | ||
|
||
|
||
@define | ||
class BulkProcessor: | ||
""" | ||
Generic driver to run a bulk operation against CrateDB, which can fall back to record-by-record operation. | ||
It aims to provide a combination of both performance/efficiency by using bulk operations, | ||
and also good usability and on-the-spot error message for records that fail to insert. | ||
Background: This is a canonical client-side API wrapper for CrateDB's bulk operations HTTP endpoint. | ||
https://cratedb.com/docs/crate/reference/en/latest/interfaces/http.html#bulk-operations | ||
""" | ||
|
||
connection: sa.Connection | ||
data: t.Iterable[t.List[t.Dict[str, t.Any]]] | ||
batch_to_operation: t.Callable[[t.List[t.Dict[str, t.Any]]], SQLOperation] | ||
progress_bar: t.Union[tqdm, None] = None | ||
on_error: t.Literal["ignore", "raise"] = "ignore" | ||
debug: bool = False | ||
|
||
_metrics: BulkMetrics = Factory(BulkMetrics) | ||
|
||
@cached_property | ||
def log_level(self): | ||
if self.debug: | ||
return logger.exception | ||
else: | ||
return logger.warning | ||
|
||
def start(self) -> BulkMetrics: | ||
# Acquire batches of documents, convert to SQL operations, and submit to CrateDB. | ||
for batch in self.data: | ||
current_batch_size = len(batch) | ||
|
||
self.progress_bar and self.progress_bar.set_description("ACQUIRE") | ||
|
||
try: | ||
operation = self.batch_to_operation(batch) | ||
except Exception as ex: | ||
self.log_level(f"Computing query failed: {ex}") | ||
if self.on_error == "raise": | ||
raise | ||
continue | ||
|
||
self._metrics.bytes_write_total += asizeof(operation) | ||
statement = sa.text(operation.statement) | ||
|
||
# Submit operation to CrateDB, using `bulk_args`. | ||
self.progress_bar and self.progress_bar.set_description("SUBMIT ") | ||
try: | ||
cursor = self.connection.execute(statement=statement, parameters=operation.parameters) | ||
self.connection.commit() | ||
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None) | ||
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result) | ||
failed_records = bulk_response.failed_records | ||
count_success_local = bulk_response.success_count | ||
self._metrics.count_success_total += bulk_response.success_count | ||
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count) | ||
|
||
# When a batch is of size one, an exception is raised. | ||
# Just signal the same condition as if a batch would have failed. | ||
except ProgrammingError: | ||
failed_records = [operation.parameters] | ||
count_success_local = 0 | ||
|
||
# When bulk operations fail, try inserting failed records record-by-record, | ||
# in order to relay proper error messages to the user. | ||
if failed_records: | ||
logger.warning( | ||
f"Incomplete batch. Records processed: {count_success_local}/{current_batch_size}. " | ||
f"Falling back to per-record operations." | ||
) | ||
for record in failed_records: | ||
try: | ||
self.connection.execute(statement=statement, parameters=record) | ||
self.connection.commit() | ||
self._metrics.count_success_total += 1 | ||
except Exception as ex: | ||
logger.warning(f"Operation failed: {ex}") | ||
logger.debug(f"Failing record: {record}") | ||
self._metrics.count_error_total += 1 | ||
self._metrics.bytes_error_total += asizeof(record) | ||
if self.on_error == "raise": | ||
raise | ||
self.progress_bar and self.progress_bar.update(n=1) | ||
|
||
self.progress_bar and self.progress_bar.close() | ||
|
||
return self._metrics |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from sqlalchemy_cratedb import dialect | ||
|
||
|
||
def do_executemany(self, cursor, statement, parameters, context=None): | ||
""" | ||
Improved version of `do_executemany` that stores its response into the request context instance. | ||
TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`. | ||
""" | ||
result = cursor.executemany(statement, parameters) | ||
if context is not None: | ||
context.last_executemany_result = result | ||
|
||
|
||
def monkeypatch_executemany(): | ||
""" | ||
Enable improved version of `do_executemany`. | ||
""" | ||
dialect.do_executemany = do_executemany |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
{"_id":1,"name":"Foo","date":{"$date":"2011-01-14T08:00:00Z"}} | ||
{"_id":2,"name":"Bar","date":{"$date":"2011-01-15T08:00:00Z"},"nested_array":[[1,2]]} | ||
{"_id":3,"name":"Baz","date":{"$date":"2011-01-16T08:00:00Z"}} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters