-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MongoDB: Improve error handling wrt. bulk operations vs. usability #262
Conversation
fc214a0
to
72f608b
Compare
52c1bca
to
233013e
Compare
from sqlalchemy_cratedb import dialect | ||
|
||
|
||
def do_executemany(self, cursor, statement, parameters, context=None): | ||
""" | ||
Improved version of `do_executemany` that stores its response into the request context instance. | ||
|
||
TODO: Refactor this to `sqlalchemy_cratedb.CrateDialect`. | ||
""" | ||
result = cursor.executemany(statement, parameters) | ||
if context is not None: | ||
context.last_executemany_result = result | ||
|
||
|
||
def monkeypatch_executemany(): | ||
""" | ||
Enable improved version of `do_executemany`. | ||
""" | ||
dialect.do_executemany = do_executemany |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That code needs to go into the SQLAlchemy dialect package on a later iteration. It sets the stage to relay CrateDB's bulk operations response through SQLAlchemy's ExecutionContext
to downstream consumers.
# Submit operation to CrateDB, using `bulk_args`. | ||
self.progress_bar and self.progress_bar.set_description("SUBMIT ") | ||
try: | ||
cursor = self.connection.execute(statement=statement, parameters=operation.parameters) | ||
self.connection.commit() | ||
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None) | ||
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result) | ||
failed_records = bulk_response.failed_records | ||
count_success_local = bulk_response.success_count | ||
self._metrics.count_success_total += bulk_response.success_count | ||
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the spot where the bulk operations response is being consumed and evaluated, by correlating the ingress parameters (batch of records) against the bulk ops response's status items {"rowcount": 1}
vs. {"rowcount": -2}
, in order to discover the effective records that failed.
233013e
to
1585b5f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice idea, looks goot but isn't this missing a test case? Or how do you ensure that it's working like expected?
tests/io/mongodb/test_copy.py
Outdated
def test_mongodb_copy_filesystem_json_relaxed_warning(caplog, cratedb): | ||
""" | ||
Verify MongoDB Extended JSON -> CrateDB data transfer, which should omit a warning on an invalid record. | ||
""" | ||
|
||
# Define source and target URLs. | ||
json_resource = "file+bson:./tests/io/mongodb/mixed.ndjson" | ||
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo" | ||
|
||
# Run transfer command. | ||
mongodb_copy(json_resource, cratedb_url) | ||
|
||
# Verify metadata in target database. | ||
assert cratedb.database.table_exists("testdrive.demo") is True | ||
assert cratedb.database.refresh_table("testdrive.demo") is True | ||
assert cratedb.database.count_records("testdrive.demo") == 1 | ||
|
||
assert "Dynamic nested arrays are not supported" in caplog.text |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test case is here. Getting hold of the warning validated within the assertion in the last line of this block would otherwise not be possible / hasn't been there before.
The test case ingests two records in MongoDB Extended JSON format from mixed.ndjson
, where one of them is invalid, as it includes a nested array. Thus, other than checking log output, the test case also validates that the target database table in CrateDB includes exactly one record.
It is just a basic test case, but validates relevant improvements reasonably, hitting CrateDB end-to-end, and provoking it to raise a Dynamic nested arrays are not supported
exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just amended the patch so the mixed.ndjson
file includes three records, where a single one is invalid, like this:
{"_id":1,"name":"Foo","date":{"$date":"2011-01-14T08:00:00Z"}}
{"_id":2,"name":"Bar","date":{"$date":"2011-01-15T08:00:00Z"},"nested_array":[[1,2]]}
{"_id":3,"name":"Baz","date":{"$date":"2011-01-16T08:00:00Z"}}
That aims to validate that processing does not stop on invalid records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Thanks for clarifying, didn't look carefully enough.
In order to have both, efficient bulk insert operations, and on-the-spot error messages on records that fail to insert, let's introduce a two-stage approach: First, try to insert a batch. When it fails, determine invalid records and insert them one-by-one, in order to relay corresponding error messages to the user.
1585b5f
to
7e10622
Compare
About
In order to have both, efficient bulk insert operations, and on-the-spot error messages on records that fail to insert, let's introduce a two-stage approach:
Details
BulkProcessor
rips the canonical implementation off the specific handler about MongoDB I/O, setting the stage to reuse it on other I/O subsystems as well. Improving error handling has already been requested for the DynamoDB table loader, so applying the outcome there will certainly be next.References
/cc @juanpardo, @hlcianfagna, @hammerhead, @wierdvanderhaar