Skip to content

Commit

Permalink
MongoDB: Improve URL computation when transferring whole databases
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Sep 12, 2024
1 parent e129e26 commit 9e52464
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 22 deletions.
42 changes: 20 additions & 22 deletions cratedb_toolkit/io/mongodb/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from cratedb_toolkit.io.mongodb.copy import MongoDBFullLoad
from cratedb_toolkit.io.mongodb.core import export, extract, translate
from cratedb_toolkit.io.mongodb.transform import TransformationManager
from cratedb_toolkit.model import DatabaseAddress
from cratedb_toolkit.model import AddressPair, DatabaseAddress
from cratedb_toolkit.util.cr8 import cr8_insert_json
from cratedb_toolkit.util.database import DatabaseAdapter

Expand Down Expand Up @@ -115,8 +115,7 @@ def mongodb_copy(
if transformation:
tm = TransformationManager(path=transformation)

tasks = []

# Check if source address URL includes a table name or not.
has_table = True
if "*" in source_url.path:
has_table = False
Expand All @@ -125,7 +124,12 @@ def mongodb_copy(
if mongodb_collection_address.table is None:
has_table = False

# Invoke `full-load` procedure.
# Build list of tasks. Either a single one when transferring a single
# collection into a table, or multiple ones when transferring multiple
# collections.
tasks = []

# `full-load` procedure, single collection.
if has_table:
tasks.append(
MongoDBFullLoad(
Expand All @@ -135,32 +139,26 @@ def mongodb_copy(
progress=progress,
)
)

# `full-load` procedure, multiple collections.
else:
logger.info(f"Inquiring collections at {source_url}")
mongodb_uri = source_url
cratedb_uri = target_url
# What the hack?
if (
mongodb_uri.scheme.startswith("mongodb")
and Path(mongodb_uri.path).is_absolute()
and mongodb_uri.path[-1] != "/"
):
mongodb_uri.path += "/"
if cratedb_uri.path[-1] != "/":
cratedb_uri.path += "/"
mongodb_query_parameters = mongodb_uri.query_params
mongodb_adapter = mongodb_adapter_factory(mongodb_uri)
address_pair_root = AddressPair(source_url=source_url, target_url=target_url)

mongodb_adapter = mongodb_adapter_factory(address_pair_root.source_url)
collections = mongodb_adapter.get_collections()
logger.info(f"Discovered collections: {len(collections)}")
logger.debug(f"Processing collections: {collections}")

for collection_path in collections:
mongodb_uri_effective = mongodb_uri.navigate(Path(collection_path).name)
mongodb_uri_effective.query_params = mongodb_query_parameters
cratedb_uri_effective = cratedb_uri.navigate(Path(collection_path).stem)
address_pair = address_pair_root.navigate(
source_path=Path(collection_path).name,
target_path=Path(collection_path).stem,
)
tasks.append(
MongoDBFullLoad(
mongodb_url=mongodb_uri_effective,
cratedb_url=cratedb_uri_effective,
mongodb_url=address_pair.source_url,
cratedb_url=address_pair.target_url,
tm=tm,
progress=progress,
)
Expand Down
44 changes: 44 additions & 0 deletions cratedb_toolkit/model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import dataclasses
import typing as t
from copy import deepcopy
from pathlib import Path

from attr import Factory
from attrs import define
from boltons.urlutils import URL


Expand Down Expand Up @@ -120,3 +123,44 @@ class InputOutputResource:
url: str
format: t.Optional[str] = None # noqa: A003
compression: t.Optional[str] = None


@define
class AddressPair:
"""
Manage two URL instances, specifically a pair of source/target URLs,
where target is mostly a CrateDB Server, while source is any.
"""

source_url: URL
target_url: URL

_source_url_query_parameters: t.Dict[str, t.Any] = Factory(dict)
_target_url_query_parameters: t.Dict[str, t.Any] = Factory(dict)

__SERVER_SCHEMES__ = ["http", "https", "mongodb", "mongodb+srv"]

def navigate(self, source_path: str, target_path: str) -> "AddressPair":
source_url_query_parameters = self.source_url.query_params
target_url_query_parameters = self.target_url.query_params

source_url = URL(str(self.source_url))
target_url = URL(str(self.target_url))

# Q: What the hack?
# A: It makes subsequent `.navigate()` operations work.
if (
source_url.scheme in self.__SERVER_SCHEMES__
and Path(source_url.path).is_absolute()
and source_url.path[-1] != "/"
):
source_url.path += "/"
if target_url.path[-1] != "/":
target_url.path += "/"

source_url = source_url.navigate(f"./{source_path}")
source_url.query_params = source_url_query_parameters
target_url = target_url.navigate(f"./{target_path}")
target_url.query_params = target_url_query_parameters

return AddressPair(source_url, target_url)

0 comments on commit 9e52464

Please sign in to comment.