Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IO: Improve BulkProcessor when running per-record operations #286

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@


## Unreleased
- IO: Improved `BulkProcessor` when running per-record operations by
also checking `rowcount` for handling `INSERT OK, 0 rows` responses
- MongoDB: Fixed BSON decoding of `{"$date": 1180690093000}` timestamps
by updating to commons-codec 0.0.21.

## 2024/10/01 v0.0.27
- MongoDB: Updated to pymongo 4.9
Expand Down
23 changes: 15 additions & 8 deletions cratedb_toolkit/io/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
return []
errors: t.List[t.Dict[str, t.Any]] = []
for record, status in zip(self.parameters, self.cratedb_bulk_result):
if status["rowcount"] == -2:
if status["rowcount"] != 1:
errors.append(record)
return errors

Expand Down Expand Up @@ -143,12 +143,17 @@
try:
cursor = self.connection.execute(statement=statement, parameters=operation.parameters)
self.connection.commit()
cratedb_bulk_result = getattr(cursor.context, "last_executemany_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
self._metrics.count_success_total += bulk_response.success_count
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count)
if cursor.rowcount > 0:
cratedb_bulk_result = getattr(cursor.context, "last_result", None)
bulk_response = BulkResponse(operation.parameters, cratedb_bulk_result)
failed_records = bulk_response.failed_records
count_success_local = bulk_response.success_count
self._metrics.count_success_total += bulk_response.success_count
self.progress_bar and self.progress_bar.update(n=bulk_response.success_count)
else:
failed_records = operation.parameters
count_success_local = 0
self.progress_bar and self.progress_bar.update(n=1)

Check warning on line 156 in cratedb_toolkit/io/core.py

View check run for this annotation

Codecov / codecov/patch

cratedb_toolkit/io/core.py#L154-L156

Added lines #L154 - L156 were not covered by tests

# When a batch is of size one, an exception is raised.
# Just signal the same condition as if a batch would have failed.
Expand All @@ -165,8 +170,10 @@
)
for record in failed_records:
try:
self.connection.execute(statement=statement, parameters=record)
cursor = self.connection.execute(statement=statement, parameters=record)
self.connection.commit()
if cursor.rowcount != 1:
raise IOError("Record has not been processed")
self._metrics.count_success_total += 1
except Exception as ex:
logger.error(f"Operation failed: {ex}")
Expand Down
3 changes: 0 additions & 3 deletions cratedb_toolkit/io/dynamodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from cratedb_toolkit.io.core import BulkProcessor
from cratedb_toolkit.io.dynamodb.adapter import DynamoDBAdapter
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter
from cratedb_toolkit.util.data import asbool

Expand All @@ -30,8 +29,6 @@ def __init__(
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

cratedb_address = DatabaseAddress.from_string(cratedb_url)
cratedb_sqlalchemy_url, cratedb_table_address = cratedb_address.decode()
cratedb_table = cratedb_table_address.fullname
Expand Down
3 changes: 0 additions & 3 deletions cratedb_toolkit/io/mongodb/copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from cratedb_toolkit.io.mongodb.adapter import mongodb_adapter_factory
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.sqlalchemy.patch import monkeypatch_executemany
from cratedb_toolkit.util import DatabaseAdapter

logger = logging.getLogger(__name__)
Expand All @@ -33,8 +32,6 @@ def __init__(
progress: bool = False,
debug: bool = True,
):
monkeypatch_executemany()

self.mongodb_uri = URL(mongodb_url)
self.cratedb_uri = URL(cratedb_url)

Expand Down
Empty file.
19 changes: 0 additions & 19 deletions cratedb_toolkit/sqlalchemy/patch.py

This file was deleted.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ dependencies = [
"python-dotenv<2",
"python-slugify<9",
"pyyaml<7",
"sqlalchemy-cratedb>=0.37,<1",
"sqlalchemy-cratedb>=0.40,<1",
"sqlparse<0.6",
"tqdm<5",
'typing-extensions<5; python_version <= "3.7"',
Expand Down Expand Up @@ -167,7 +167,7 @@ kinesis = [
"lorrystream[carabas]>=0.0.6",
]
mongodb = [
"commons-codec[mongodb,zyp]>=0.0.20",
"commons-codec[mongodb,zyp]>=0.0.21",
"cratedb-toolkit[io]",
"orjson<4,>=3.3.1",
"pymongo<4.10,>=3.10.1",
Expand Down
44 changes: 43 additions & 1 deletion tests/io/mongodb/test_copy.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def test_mongodb_copy_filesystem_bson(caplog, cratedb):
assert timestamp_type == "bigint"


def test_mongodb_copy_http_json_relaxed(caplog, cratedb):
def test_mongodb_copy_http_json_relaxed_books(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP.
"""
Expand Down Expand Up @@ -355,3 +355,45 @@ def test_mongodb_copy_http_json_relaxed(caplog, cratedb):
"Charlie Collins",
"Robi Sen",
]


def test_mongodb_copy_http_json_relaxed_products(caplog, cratedb):
"""
Verify MongoDB Extended JSON -> CrateDB data transfer, when source file is on HTTP.

`datasets/products.json` includes one invalid record.
"""

# Define source and target URLs.
json_resource = "https+bson://github.com/ozlerhakan/mongodb-json-files/raw/master/datasets/products.json"
cratedb_url = f"{cratedb.get_connection_url()}/testdrive/demo"

# Run transfer command.
jqlang_transformation = """
.[] |= (
select(true)
| if (.for) then .for |= to_array end
| if (.type) then .type |= to_array end
| if (.limits.data.n) then .limits.data.n |= tostring end
| if (.limits.sms.n) then .limits.sms.n |= tostring end
| if (.limits.voice.n) then .limits.voice.n |= tostring end
)
"""
transformation = TransformationProject().add(
CollectionTransformation(
address=CollectionAddress(container="datasets", name="products"),
pre=MokshaTransformation().jq(jqlang_transformation),
)
)
mongodb_copy(json_resource, cratedb_url, transformation=transformation)

# Verify metadata in target database.
assert cratedb.database.table_exists("testdrive.demo") is True
assert cratedb.database.refresh_table("testdrive.demo") is True
assert cratedb.database.count_records("testdrive.demo") == 10

# Verify content in target database.
results = cratedb.database.run_sql("SELECT * FROM testdrive.demo WHERE data['_id'] = 'ac3';", records=True)
assert results[0]["data"]["name"] == "AC3 Phone"

assert "Bulk processor metrics: BulkMetrics(count_success_total=10, count_error_total=1" in caplog.text