diff --git a/ludwig/hyperopt/execution.py b/ludwig/hyperopt/execution.py index 5d8d5e85bdd..59199f95755 100644 --- a/ludwig/hyperopt/execution.py +++ b/ludwig/hyperopt/execution.py @@ -21,7 +21,8 @@ from ray.tune import ExperimentAnalysis, register_trainable, Stopper from ray.tune.schedulers.resource_changing_scheduler import DistributeResources, ResourceChangingScheduler from ray.tune.suggest import BasicVariantGenerator, ConcurrencyLimiter -from ray.tune.sync_client import CommandBasedClient + +# from ray.tune.sync_client import CommandBasedClient from ray.tune.utils import wait_for_gpu from ray.tune.utils.placement_groups import PlacementGroupFactory from ray.util.queue import Queue as RayQueue @@ -845,14 +846,31 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None): ) if has_remote_protocol(output_directory): - run_experiment_trial = tune.durable(run_experiment_trial) + if not _ray_200: + run_experiment_trial = tune.durable(run_experiment_trial) + + # Build Sync Client + if _ray_200: + from ludwig.hyperopt.syncer import RemoteSyncer + + # self.sync_client = get_node_to_storage_syncer(SyncConfig(upload_dir=output_directory)) + self.sync_client = RemoteSyncer(sync_period=60, backend=backend) + # elif self.sync_function_template: + # self.sync_client = CommandBasedClient( + # sync_up_template=self.sync_function_template, + # sync_down_template=self.sync_function_template, + # delete_template=self.delete_function_template, # No errors if this is None + # ) + else: + self.sync_client = get_cloud_sync_client(output_directory) # Build Sync Config - if self.sync_function_template: + # if self.sync_function_template: + if _ray_200: + print(f"Using {self.sync_client} as the syncer in SyncConfig") self.sync_config = tune.SyncConfig( - sync_to_driver=False, upload_dir=output_directory, - syncer=self.sync_function_template, + syncer=self.sync_client, sync_period=60, ) else: @@ -862,26 +880,15 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None): sync_period=60, ) - # Build Sync Client - if _ray_200: - from ludwig.hyperopt.syncer import RemoteSyncer - - # self.sync_client = get_node_to_storage_syncer(SyncConfig(upload_dir=output_directory)) - self.sync_client = RemoteSyncer(backend=backend, sync_period=60) - elif self.sync_function_template: - self.sync_client = CommandBasedClient( - sync_up_template=self.sync_function_template, - sync_down_template=self.sync_function_template, - delete_template=self.delete_function_template, # No errors if this is None - ) - else: - self.sync_client = get_cloud_sync_client(output_directory) elif self.kubernetes_namespace: from ray.tune.integration.kubernetes import KubernetesSyncClient, NamespacedKubernetesSyncer self.sync_config = tune.SyncConfig(sync_to_driver=NamespacedKubernetesSyncer(self.kubernetes_namespace)) self.sync_client = KubernetesSyncClient(self.kubernetes_namespace) + print(f"Sync Client: {self.sync_client}") + print(f"Sync Config: {self.sync_config}") + run_experiment_trial_params = tune.with_parameters(run_experiment_trial, local_hyperopt_dict=hyperopt_dict) register_trainable(f"trainable_func_f{hash_dict(config).decode('ascii')}", run_experiment_trial_params)