Skip to content

Commit

Permalink
clean up code further
Browse files Browse the repository at this point in the history
  • Loading branch information
arnavgarg1 committed Sep 16, 2022
1 parent b928237 commit 9756250
Showing 1 changed file with 13 additions and 51 deletions.
64 changes: 13 additions & 51 deletions ludwig/hyperopt/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,41 +308,18 @@ def _cpu_resources_per_trial_non_none(self):
def _gpu_resources_per_trial_non_none(self):
return self.gpu_resources_per_trial if self.gpu_resources_per_trial is not None else 0

def _get_remote_checkpoint_dir(self, trial_dir: Path) -> Optional[Union[str, Tuple[str, str]]]:
def _get_remote_checkpoint_dir(
self, trial_dir: Path, use_tmp: bool = False
) -> Optional[Union[str, Tuple[str, str]]]:
"""Get the path to remote checkpoint directory."""
if self.sync_config is None:
return None

if self.sync_config.upload_dir is not None:
# Cloud storage sync config
remote_checkpoint_dir = os.path.join(
self.sync_config.upload_dir, *_get_relative_checkpoints_dir_parts(trial_dir)
)
return remote_checkpoint_dir
elif self.kubernetes_namespace is not None:
# Kubernetes sync config. Returns driver node name and path.
# When running on kubernetes, each trial is rsynced to the node running the main process.
node_name = self._get_kubernetes_node_address_by_ip()(self.head_node_ip)
return (node_name, trial_dir)
else:
logger.warning(
"Checkpoint syncing disabled as it is only supported to remote cloud storage or on Kubernetes "
"clusters. To use syncing, set the kubernetes_namespace in the config or use a cloud URI "
"as the output directory."
)
return None

def _get_tmp_remote_checkpoint_dir(self, trial_dir: Path) -> Optional[Union[str, Tuple[str, str]]]:
"""Get the path to remote checkpoint directory."""
if self.sync_config is None:
return None

if self.sync_config.upload_dir is not None:
# Cloud storage sync config
remote_checkpoint_dir = os.path.join(
self.sync_config.upload_dir, "tmp", *_get_relative_checkpoints_dir_parts(trial_dir)
)
return remote_checkpoint_dir
if use_tmp:
return os.path.join(self.sync_config.upload_dir, "tmp", *_get_relative_checkpoints_dir_parts(trial_dir))
return os.path.join(self.sync_config.upload_dir, *_get_relative_checkpoints_dir_parts(trial_dir))
elif self.kubernetes_namespace is not None:
# Kubernetes sync config. Returns driver node name and path.
# When running on kubernetes, each trial is rsynced to the node running the main process.
Expand Down Expand Up @@ -424,7 +401,6 @@ def _evaluate_best_model(
backend,
debug,
):
print(f"[_evaluate_best_model] Evaluating best model at path: {best_model_path}")
best_model = LudwigModel.load(
os.path.join(best_model_path, "model"),
backend=backend,
Expand Down Expand Up @@ -523,8 +499,8 @@ def __init__(self):
self.training_set_metadata = None

def _get_remote_checkpoint_dir(self) -> Optional[Union[str, Tuple[str, str]]]:
# sync client has to be recreated to avoid issues with serialization
return tune_executor._get_tmp_remote_checkpoint_dir(trial_dir)
# Sync client has to be recreated to avoid issues with serialization
return tune_executor._get_remote_checkpoint_dir(trial_dir, use_tmp=True)

def _checkpoint_progress(self, trainer, progress_tracker, save_path) -> None:
"""Checkpoints the progress tracker."""
Expand Down Expand Up @@ -650,9 +626,7 @@ def _run():

if self.sync_config is not None:
remote_checkpoint_dir = self._get_remote_checkpoint_dir(trial_dir)
tmp_remote_checkpoint_dir = self._get_tmp_remote_checkpoint_dir(trial_dir)
print("Remote Checkpoint Dir: ", remote_checkpoint_dir)
print("Tmp Remote Checkpoint Dir: ", tmp_remote_checkpoint_dir)
tmp_remote_checkpoint_dir = self._get_remote_checkpoint_dir(trial_dir, use_tmp=True)

def check_queue():
qsize = ray_queue.qsize()
Expand All @@ -663,7 +637,7 @@ def check_queue():
self.sync_client.sync_down(remote_checkpoint_dir, str(trial_dir.absolute()))
self.sync_client.wait()
else:
# Sync down from tmp_dir to remote_checkpoint_dir
# Sync down from tmp_remote_checkpoint_dir to local trial dir
print(
f"[Trial Driver] Syncing down from {tmp_remote_checkpoint_dir}"
f" to {str(trial_dir.absolute())}"
Expand Down Expand Up @@ -854,7 +828,7 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None):
from ludwig.hyperopt.syncer import RemoteSyncer

# self.sync_client = get_node_to_storage_syncer(SyncConfig(upload_dir=output_directory))
self.sync_client = RemoteSyncer(sync_period=60, backend=backend)
self.sync_client = RemoteSyncer(backend=backend)
# elif self.sync_function_template:
# self.sync_client = CommandBasedClient(
# sync_up_template=self.sync_function_template,
Expand All @@ -867,28 +841,16 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None):
# Build Sync Config
# if self.sync_function_template:
if _ray_200:
print(f"Using {self.sync_client} as the syncer in SyncConfig")
self.sync_config = tune.SyncConfig(
upload_dir=output_directory,
syncer=self.sync_client,
sync_period=60,
)
self.sync_config = tune.SyncConfig(upload_dir=output_directory, syncer=self.sync_client)
else:
self.sync_config = tune.SyncConfig(
sync_to_driver=False,
upload_dir=output_directory,
sync_period=60,
)
self.sync_config = tune.SyncConfig(sync_to_driver=False, upload_dir=output_directory)

elif self.kubernetes_namespace:
from ray.tune.integration.kubernetes import KubernetesSyncClient, NamespacedKubernetesSyncer

self.sync_config = tune.SyncConfig(sync_to_driver=NamespacedKubernetesSyncer(self.kubernetes_namespace))
self.sync_client = KubernetesSyncClient(self.kubernetes_namespace)

print(f"Sync Client: {self.sync_client}")
print(f"Sync Config: {self.sync_config}")

run_experiment_trial_params = tune.with_parameters(run_experiment_trial, local_hyperopt_dict=hyperopt_dict)
register_trainable(f"trainable_func_f{hash_dict(config).decode('ascii')}", run_experiment_trial_params)

Expand Down

0 comments on commit 9756250

Please sign in to comment.