From 97562502e7f293e312249d111c3df6bfe5ce996b Mon Sep 17 00:00:00 2001 From: Arnav Garg Date: Thu, 15 Sep 2022 23:40:07 -0500 Subject: [PATCH] clean up code further --- ludwig/hyperopt/execution.py | 64 ++++++++---------------------------- 1 file changed, 13 insertions(+), 51 deletions(-) diff --git a/ludwig/hyperopt/execution.py b/ludwig/hyperopt/execution.py index 59199f95755..9c6bd0b6d4d 100644 --- a/ludwig/hyperopt/execution.py +++ b/ludwig/hyperopt/execution.py @@ -308,41 +308,18 @@ def _cpu_resources_per_trial_non_none(self): def _gpu_resources_per_trial_non_none(self): return self.gpu_resources_per_trial if self.gpu_resources_per_trial is not None else 0 - def _get_remote_checkpoint_dir(self, trial_dir: Path) -> Optional[Union[str, Tuple[str, str]]]: + def _get_remote_checkpoint_dir( + self, trial_dir: Path, use_tmp: bool = False + ) -> Optional[Union[str, Tuple[str, str]]]: """Get the path to remote checkpoint directory.""" if self.sync_config is None: return None if self.sync_config.upload_dir is not None: # Cloud storage sync config - remote_checkpoint_dir = os.path.join( - self.sync_config.upload_dir, *_get_relative_checkpoints_dir_parts(trial_dir) - ) - return remote_checkpoint_dir - elif self.kubernetes_namespace is not None: - # Kubernetes sync config. Returns driver node name and path. - # When running on kubernetes, each trial is rsynced to the node running the main process. - node_name = self._get_kubernetes_node_address_by_ip()(self.head_node_ip) - return (node_name, trial_dir) - else: - logger.warning( - "Checkpoint syncing disabled as it is only supported to remote cloud storage or on Kubernetes " - "clusters. To use syncing, set the kubernetes_namespace in the config or use a cloud URI " - "as the output directory." - ) - return None - - def _get_tmp_remote_checkpoint_dir(self, trial_dir: Path) -> Optional[Union[str, Tuple[str, str]]]: - """Get the path to remote checkpoint directory.""" - if self.sync_config is None: - return None - - if self.sync_config.upload_dir is not None: - # Cloud storage sync config - remote_checkpoint_dir = os.path.join( - self.sync_config.upload_dir, "tmp", *_get_relative_checkpoints_dir_parts(trial_dir) - ) - return remote_checkpoint_dir + if use_tmp: + return os.path.join(self.sync_config.upload_dir, "tmp", *_get_relative_checkpoints_dir_parts(trial_dir)) + return os.path.join(self.sync_config.upload_dir, *_get_relative_checkpoints_dir_parts(trial_dir)) elif self.kubernetes_namespace is not None: # Kubernetes sync config. Returns driver node name and path. # When running on kubernetes, each trial is rsynced to the node running the main process. @@ -424,7 +401,6 @@ def _evaluate_best_model( backend, debug, ): - print(f"[_evaluate_best_model] Evaluating best model at path: {best_model_path}") best_model = LudwigModel.load( os.path.join(best_model_path, "model"), backend=backend, @@ -523,8 +499,8 @@ def __init__(self): self.training_set_metadata = None def _get_remote_checkpoint_dir(self) -> Optional[Union[str, Tuple[str, str]]]: - # sync client has to be recreated to avoid issues with serialization - return tune_executor._get_tmp_remote_checkpoint_dir(trial_dir) + # Sync client has to be recreated to avoid issues with serialization + return tune_executor._get_remote_checkpoint_dir(trial_dir, use_tmp=True) def _checkpoint_progress(self, trainer, progress_tracker, save_path) -> None: """Checkpoints the progress tracker.""" @@ -650,9 +626,7 @@ def _run(): if self.sync_config is not None: remote_checkpoint_dir = self._get_remote_checkpoint_dir(trial_dir) - tmp_remote_checkpoint_dir = self._get_tmp_remote_checkpoint_dir(trial_dir) - print("Remote Checkpoint Dir: ", remote_checkpoint_dir) - print("Tmp Remote Checkpoint Dir: ", tmp_remote_checkpoint_dir) + tmp_remote_checkpoint_dir = self._get_remote_checkpoint_dir(trial_dir, use_tmp=True) def check_queue(): qsize = ray_queue.qsize() @@ -663,7 +637,7 @@ def check_queue(): self.sync_client.sync_down(remote_checkpoint_dir, str(trial_dir.absolute())) self.sync_client.wait() else: - # Sync down from tmp_dir to remote_checkpoint_dir + # Sync down from tmp_remote_checkpoint_dir to local trial dir print( f"[Trial Driver] Syncing down from {tmp_remote_checkpoint_dir}" f" to {str(trial_dir.absolute())}" @@ -854,7 +828,7 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None): from ludwig.hyperopt.syncer import RemoteSyncer # self.sync_client = get_node_to_storage_syncer(SyncConfig(upload_dir=output_directory)) - self.sync_client = RemoteSyncer(sync_period=60, backend=backend) + self.sync_client = RemoteSyncer(backend=backend) # elif self.sync_function_template: # self.sync_client = CommandBasedClient( # sync_up_template=self.sync_function_template, @@ -867,18 +841,9 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None): # Build Sync Config # if self.sync_function_template: if _ray_200: - print(f"Using {self.sync_client} as the syncer in SyncConfig") - self.sync_config = tune.SyncConfig( - upload_dir=output_directory, - syncer=self.sync_client, - sync_period=60, - ) + self.sync_config = tune.SyncConfig(upload_dir=output_directory, syncer=self.sync_client) else: - self.sync_config = tune.SyncConfig( - sync_to_driver=False, - upload_dir=output_directory, - sync_period=60, - ) + self.sync_config = tune.SyncConfig(sync_to_driver=False, upload_dir=output_directory) elif self.kubernetes_namespace: from ray.tune.integration.kubernetes import KubernetesSyncClient, NamespacedKubernetesSyncer @@ -886,9 +851,6 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None): self.sync_config = tune.SyncConfig(sync_to_driver=NamespacedKubernetesSyncer(self.kubernetes_namespace)) self.sync_client = KubernetesSyncClient(self.kubernetes_namespace) - print(f"Sync Client: {self.sync_client}") - print(f"Sync Config: {self.sync_config}") - run_experiment_trial_params = tune.with_parameters(run_experiment_trial, local_hyperopt_dict=hyperopt_dict) register_trainable(f"trainable_func_f{hash_dict(config).decode('ascii')}", run_experiment_trial_params)