Skip to content

Commit

Permalink
Refactor retry during migration (#706)
Browse files Browse the repository at this point in the history
* new: retry mechanism in migrate

* fixed misdeletion during merging

* fix: minor type hint update

---------

Co-authored-by: George Panchuk <[email protected]>
  • Loading branch information
hh-space-invader and joein committed Aug 9, 2024
1 parent 5292407 commit 35834ff
Showing 1 changed file with 30 additions and 3 deletions.
33 changes: 30 additions & 3 deletions qdrant_client/migrate/migrate.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,37 @@
from typing import Dict, List, Optional
import time
from typing import Dict, Iterable, List, Optional

from qdrant_client._pydantic_compat import to_dict
from qdrant_client.client_base import QdrantBase
from qdrant_client.http import models


def upload_with_retry(
client: QdrantBase,
collection_name: str,
points: Iterable[models.PointStruct],
max_attempts: int = 3,
pause: float = 3.0,
) -> None:
attempts = 1
while attempts <= max_attempts:
try:
client.upload_points(
collection_name=collection_name,
points=points,
wait=True,
)
return
except Exception as e:
print(f"Exception: {e}, attempt {attempts}/{max_attempts}")
if attempts < max_attempts:
print(f"Next attempt in {pause} seconds")
time.sleep(pause)
attempts += 1

raise Exception(f"Failed to upload points after {max_attempts} attempts")


def migrate(
source_client: QdrantBase,
dest_client: QdrantBase,
Expand Down Expand Up @@ -130,15 +157,15 @@ def _migrate_collection(
batch_size (int, optional): Batch size for scrolling and uploading vectors. Defaults to 100.
"""
records, next_offset = source_client.scroll(collection_name, limit=2, with_vectors=True)
dest_client.upload_points(collection_name, records, wait=True) # type: ignore
upload_with_retry(client=dest_client, collection_name=collection_name, points=records) # type: ignore
# upload_records has been deprecated due to the usage of models.Record; models.Record has been deprecated as a
# structure for uploading due to a `shard_key` field, and now is used only as a result structure.
# since shard_keys are not supported in migration, we can safely type ignore here and use Records for uploading
while next_offset is not None:
records, next_offset = source_client.scroll(
collection_name, offset=next_offset, limit=batch_size, with_vectors=True
)
dest_client.upload_points(collection_name, records, wait=True) # type: ignore
upload_with_retry(client=dest_client, collection_name=collection_name, points=records) # type: ignore
source_client_vectors_count = source_client.count(collection_name).count
dest_client_vectors_count = dest_client.count(collection_name).count
assert (
Expand Down

0 comments on commit 35834ff

Please sign in to comment.