Skip to content

Commit

Permalink
Update syncer.py
Browse files Browse the repository at this point in the history
  • Loading branch information
tgaddair authored Oct 10, 2022
1 parent 4fa404b commit 2ab6b68
Showing 1 changed file with 1 addition and 49 deletions.
50 changes: 1 addition & 49 deletions ludwig/hyperopt/syncer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Any, Callable, Dict, List, Optional, Tuple

from ray import tune
from ray.tune.syncer import _BackgroundSyncer, get_node_to_storage_syncer, Syncer
from ray.tune.syncer import _BackgroundSyncer

from ludwig.utils.data_utils import use_credentials
from ludwig.utils.fs_utils import delete, download, upload
from ludwig.utils.misc_utils import memoized_method


class RemoteSyncer(_BackgroundSyncer):
Expand Down Expand Up @@ -34,49 +32,3 @@ def __reduce__(self):
deserializer = RemoteSyncer
serialized_data = (self.sync_period, self.creds)
return deserializer, serialized_data


class LazyFsspecSyncer(Syncer):
def __init__(self, upload_dir: str, creds: Optional[Dict[str, Any]] = None):
super().__init__()
self.upload_dir = upload_dir
self.creds = creds
self._syncer = None

def sync_up(self, *args, **kwargs) -> bool:
with use_credentials(self.creds):
return self.syncer().sync_up(*args, **kwargs)

def sync_down(self, *args, **kwargs) -> bool:
with use_credentials(self.creds):
return self.syncer().sync_down(*args, **kwargs)

def delete(self, *args, **kwargs) -> bool:
with use_credentials(self.creds):
return self.syncer().delete(*args, **kwargs)

@memoized_method(maxsize=1)
def syncer(self):
if self._syncer is None:
sync_config = tune.SyncConfig(upload_dir=self.upload_dir)
self._syncer = get_node_to_storage_syncer(sync_config)
return self._syncer


class WrappedSyncer(Syncer):
def __init__(self, syncer: Syncer, creds: Optional[Dict[str, Any]] = None):
super().__init__()
self.syncer = syncer
self.creds = creds

def sync_up(self, *args, **kwargs) -> bool:
with use_credentials(self.creds):
return self.syncer.sync_up(*args, **kwargs)

def sync_down(self, *args, **kwargs) -> bool:
with use_credentials(self.creds):
return self.syncer.sync_down(*args, **kwargs)

def delete(self, *args, **kwargs) -> bool:
with use_credentials(self.creds):
return self.syncer.delete(*args, **kwargs)

0 comments on commit 2ab6b68

Please sign in to comment.