From c306c9518d92ab2c1e8c816228c28d87f4448888 Mon Sep 17 00:00:00 2001 From: Nathan Zimmerman Date: Wed, 12 Jun 2024 13:18:06 -0500 Subject: [PATCH] Attempt to log the file that is failing during xarray open --- pangeo_forge_recipes/openers.py | 13 ++++++++++--- pangeo_forge_recipes/transforms.py | 7 +++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pangeo_forge_recipes/openers.py b/pangeo_forge_recipes/openers.py index d419c885..c9641568 100644 --- a/pangeo_forge_recipes/openers.py +++ b/pangeo_forge_recipes/openers.py @@ -1,6 +1,7 @@ """Standalone functions for opening sources as Dataset objects.""" import io +import logging import tempfile import warnings from typing import Dict, Optional, Union @@ -12,6 +13,7 @@ from .patterns import FileType from .storage import CacheFSSpecTarget, OpenFileType, _copy_btw_filesystems, _get_opener +logger = logging.getLogger(__name__) def open_url( url: str, @@ -232,13 +234,18 @@ def open_with_xarray( tmp_name = ntf.name target_opener = open(tmp_name, mode="wb") _copy_btw_filesystems(url_or_file_obj, target_opener) + file_url_origin = url_or_file_obj url_or_file_obj = tmp_name url_or_file_obj = _preprocess_url_or_file_obj(url_or_file_obj, file_type) - ds = xr.open_dataset(url_or_file_obj, **kw) - if load: - ds.load() + try: + ds = xr.open_dataset(url_or_file_obj, **kw) + if load: + ds.load() + except Exception as e: + logger.debug(f"URL failing to open with Xarray: {file_url_origin}") + raise e if copy_to_local and not load: warnings.warn( diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 52beb049..aa10862a 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -162,10 +162,12 @@ class TransferFilesWithConcurrency(beam.DoFn): secrets: Optional[Dict] = None open_kwargs: Optional[Dict] = None fsspec_sync_patch: bool = False - max_retries: int = 5 - initial_backoff: float = 1.0 + max_retries: int = 3 + initial_backoff: float = 2.0 backoff_factor: float = 2.0 + # TODO: Fall back to lower concurrency per-worker rather than simply retrying per request + # TODO: Make sure to keep track of what has succeeded so as not to duplicate reads/writes def process(self, indexed_urls): with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor: futures = { @@ -204,6 +206,7 @@ def transfer_file(self, index: Index, url: str) -> Tuple[Index, str]: raise RuntimeError(f"Failed to transfer file {url} after {self.max_retries} attempts.") +#TODO: MAKE SURE ALL URLS PUT IN COME OUT @dataclass class CheckpointFileTransfer(beam.PTransform): """