Skip to content

Commit

Permalink
Add compatibility with 2.0 for syncConfig and SyncClient creation
Browse files Browse the repository at this point in the history
  • Loading branch information
arnavgarg1 committed Sep 15, 2022
1 parent 80412e7 commit 05156c8
Showing 1 changed file with 26 additions and 19 deletions.
45 changes: 26 additions & 19 deletions ludwig/hyperopt/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
from ray.tune import ExperimentAnalysis, register_trainable, Stopper
from ray.tune.schedulers.resource_changing_scheduler import DistributeResources, ResourceChangingScheduler
from ray.tune.suggest import BasicVariantGenerator, ConcurrencyLimiter
from ray.tune.sync_client import CommandBasedClient

# from ray.tune.sync_client import CommandBasedClient
from ray.tune.utils import wait_for_gpu
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.util.queue import Queue as RayQueue
Expand Down Expand Up @@ -845,14 +846,31 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None):
)

if has_remote_protocol(output_directory):
run_experiment_trial = tune.durable(run_experiment_trial)
if not _ray_200:
run_experiment_trial = tune.durable(run_experiment_trial)

# Build Sync Client
if _ray_200:
from ludwig.hyperopt.syncer import RemoteSyncer

# self.sync_client = get_node_to_storage_syncer(SyncConfig(upload_dir=output_directory))
self.sync_client = RemoteSyncer(sync_period=60, backend=backend)
# elif self.sync_function_template:
# self.sync_client = CommandBasedClient(
# sync_up_template=self.sync_function_template,
# sync_down_template=self.sync_function_template,
# delete_template=self.delete_function_template, # No errors if this is None
# )
else:
self.sync_client = get_cloud_sync_client(output_directory)

# Build Sync Config
if self.sync_function_template:
# if self.sync_function_template:
if _ray_200:
print(f"Using {self.sync_client} as the syncer in SyncConfig")
self.sync_config = tune.SyncConfig(
sync_to_driver=False,
upload_dir=output_directory,
syncer=self.sync_function_template,
syncer=self.sync_client,
sync_period=60,
)
else:
Expand All @@ -862,26 +880,15 @@ def run_experiment_trial(config, local_hyperopt_dict, checkpoint_dir=None):
sync_period=60,
)

# Build Sync Client
if _ray_200:
from ludwig.hyperopt.syncer import RemoteSyncer

# self.sync_client = get_node_to_storage_syncer(SyncConfig(upload_dir=output_directory))
self.sync_client = RemoteSyncer(backend=backend, sync_period=60)
elif self.sync_function_template:
self.sync_client = CommandBasedClient(
sync_up_template=self.sync_function_template,
sync_down_template=self.sync_function_template,
delete_template=self.delete_function_template, # No errors if this is None
)
else:
self.sync_client = get_cloud_sync_client(output_directory)
elif self.kubernetes_namespace:
from ray.tune.integration.kubernetes import KubernetesSyncClient, NamespacedKubernetesSyncer

self.sync_config = tune.SyncConfig(sync_to_driver=NamespacedKubernetesSyncer(self.kubernetes_namespace))
self.sync_client = KubernetesSyncClient(self.kubernetes_namespace)

print(f"Sync Client: {self.sync_client}")
print(f"Sync Config: {self.sync_config}")

run_experiment_trial_params = tune.with_parameters(run_experiment_trial, local_hyperopt_dict=hyperopt_dict)
register_trainable(f"trainable_func_f{hash_dict(config).decode('ascii')}", run_experiment_trial_params)

Expand Down

0 comments on commit 05156c8

Please sign in to comment.