diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 160ad1a6..d64af2ed 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -147,12 +147,16 @@ class TransferFilesWithConcurrency(beam.DoFn): concurrency_per_executor: The number of concurrent threads per executor. secrets: Optional dictionary containing secrets required for accessing the transfer target. open_kwargs: Optional dictionary of keyword arguments to be passed when opening files. + fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to + replace asynchronous code with synchronous implementations to potentially address + deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019 """ transfer_target: CacheFSSpecTarget max_concurrency: int secrets: Optional[Dict] = None open_kwargs: Optional[Dict] = None + fsspec_sync_patch: bool = False def process(self, indexed_urls): with ThreadPoolExecutor(max_workers=self.max_concurrency) as executor: @@ -170,7 +174,7 @@ def process(self, indexed_urls): def transfer_file(self, index: Index, url: str) -> Tuple[Index, str]: open_kwargs = self.open_kwargs or {} - self.transfer_target.cache_file(url, self.secrets, **open_kwargs) + self.transfer_target.cache_file(url, self.secrets, self.fsspec_sync_patch, **open_kwargs) return (index, self.transfer_target._full_path(url)) @@ -188,6 +192,9 @@ class CheckpointFileTransfer(beam.PTransform): number to limit total cluster concurrency. Default is 20. concurrency_per_executor (Optional[int]): The number of concurrent threads per executor. Default is 10. + fsspec_sync_patch: Experimental. Likely slower. When enabled, this attempts to + replace asynchronous code with synchronous implementations to potentially address + deadlocking issues. cf. https://github.com/h5py/h5py/issues/2019 """ transfer_target: Union[str, CacheFSSpecTarget] @@ -195,6 +202,7 @@ class CheckpointFileTransfer(beam.PTransform): open_kwargs: Optional[dict] = None max_executors: int = 20 concurrency_per_executor: int = 10 + fsspec_sync_patch: bool = False def assign_keys(self, element) -> Tuple[int, Any]: index, url = element @@ -219,6 +227,7 @@ def expand(self, pcoll): secrets=self.secrets, open_kwargs=self.open_kwargs, max_concurrency=self.concurrency_per_executor, + fsspec_sync_patch=self.fsspec_sync_patch, ) ) )