Skip to content

Commit

Permalink
Attempt to log the file that is failing during xarray open
Browse files Browse the repository at this point in the history
  • Loading branch information
moradology committed Jun 12, 2024
1 parent a851610 commit c306c95
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
13 changes: 10 additions & 3 deletions pangeo_forge_recipes/openers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Standalone functions for opening sources as Dataset objects."""

import io
import logging
import tempfile
import warnings
from typing import Dict, Optional, Union
Expand All @@ -12,6 +13,7 @@
from .patterns import FileType
from .storage import CacheFSSpecTarget, OpenFileType, _copy_btw_filesystems, _get_opener

logger = logging.getLogger(__name__)

def open_url(
url: str,
Expand Down Expand Up @@ -232,13 +234,18 @@ def open_with_xarray(
tmp_name = ntf.name
target_opener = open(tmp_name, mode="wb")
_copy_btw_filesystems(url_or_file_obj, target_opener)
file_url_origin = url_or_file_obj
url_or_file_obj = tmp_name

url_or_file_obj = _preprocess_url_or_file_obj(url_or_file_obj, file_type)

ds = xr.open_dataset(url_or_file_obj, **kw)
if load:
ds.load()
try:
ds = xr.open_dataset(url_or_file_obj, **kw)
if load:
ds.load()
except Exception as e:
logger.debug(f"URL failing to open with Xarray: {file_url_origin}")
raise e

if copy_to_local and not load:
warnings.warn(
Expand Down
7 changes: 5 additions & 2 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,12 @@ class TransferFilesWithConcurrency(beam.DoFn):
secrets: Optional[Dict] = None
open_kwargs: Optional[Dict] = None
fsspec_sync_patch: bool = False
max_retries: int = 5
initial_backoff: float = 1.0
max_retries: int = 3
initial_backoff: float = 2.0
backoff_factor: float = 2.0

# TODO: Fall back to lower concurrency per-worker rather than simply retrying per request
# TODO: Make sure to keep track of what has succeeded so as not to duplicate reads/writes
def process(self, indexed_urls):
with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor:
futures = {
Expand Down Expand Up @@ -204,6 +206,7 @@ def transfer_file(self, index: Index, url: str) -> Tuple[Index, str]:
raise RuntimeError(f"Failed to transfer file {url} after {self.max_retries} attempts.")


#TODO: MAKE SURE ALL URLS PUT IN COME OUT
@dataclass
class CheckpointFileTransfer(beam.PTransform):
"""
Expand Down

0 comments on commit c306c95

Please sign in to comment.