Skip to content

Commit

Permalink
[data] Add failure retry logic for read_lance (#46976)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Hao Chen <[email protected]>
  • Loading branch information
raulchen authored Aug 6, 2024
1 parent 7731574 commit ecc7085
Showing 1 changed file with 38 additions and 2 deletions.
40 changes: 38 additions & 2 deletions python/ray/data/_internal/datasource/lance_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import numpy as np

from ray.data._internal.util import _check_import
from ray.data._internal.util import _check_import, call_with_retry
from ray.data.block import BlockMetadata
from ray.data.context import DataContext
from ray.data.datasource.datasource import Datasource, ReadTask

if TYPE_CHECKING:
Expand All @@ -17,6 +18,13 @@
class LanceDatasource(Datasource):
"""Lance datasource, for reading Lance dataset."""

# Errors to retry when reading Lance fragments.
READ_FRAGMENTS_ERRORS_TO_RETRY = ["LanceError(IO)"]
# Maximum number of attempts to read Lance fragments.
READ_FRAGMENTS_MAX_ATTEMPTS = 10
# Maximum backoff seconds between attempts to read Lance fragments.
READ_FRAGMENTS_RETRY_MAX_BACKOFF_SECONDS = 32

def __init__(
self,
uri: str,
Expand All @@ -38,6 +46,16 @@ def __init__(
self.storage_options = storage_options
self.lance_ds = lance.dataset(uri=uri, storage_options=storage_options)

match = []
match.extend(self.READ_FRAGMENTS_ERRORS_TO_RETRY)
match.extend(DataContext.get_current().retried_io_errors)
self._retry_params = {
"description": "read lance fragments",
"match": match,
"max_attempts": self.READ_FRAGMENTS_MAX_ATTEMPTS,
"max_backoff_s": self.READ_FRAGMENTS_RETRY_MAX_BACKOFF_SECONDS,
}

def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
read_tasks = []
for fragments in np.array_split(self.lance_ds.get_fragments(), parallelism):
Expand All @@ -60,9 +78,15 @@ def get_read_tasks(self, parallelism: int) -> List[ReadTask]:
)
scanner_options = self.scanner_options
lance_ds = self.lance_ds
retry_params = self._retry_params

read_task = ReadTask(
lambda f=fragment_ids: _read_fragments(f, lance_ds, scanner_options),
lambda f=fragment_ids: _read_fragments_with_retry(
f,
lance_ds,
scanner_options,
retry_params,
),
metadata,
)
read_tasks.append(read_task)
Expand All @@ -74,6 +98,18 @@ def estimate_inmemory_data_size(self) -> Optional[int]:
return None


def _read_fragments_with_retry(
fragment_ids,
lance_ds,
scanner_options,
retry_params,
) -> Iterator["pyarrow.Table"]:
return call_with_retry(
lambda: _read_fragments(fragment_ids, lance_ds, scanner_options),
**retry_params,
)


def _read_fragments(
fragment_ids,
lance_ds,
Expand Down

0 comments on commit ecc7085

Please sign in to comment.