Skip to content

Commit

Permalink
[SDK-338] Added option to enable Rubix (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijithshankar93 authored and msumit committed Feb 21, 2019
1 parent c164a55 commit bad7cd9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 13 deletions.
3 changes: 2 additions & 1 deletion bin/qds.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ def _create_cluster_info(arguments, api_version):
is_ha=arguments.is_ha,
env_name=arguments.env_name,
python_version=arguments.python_version,
r_version=arguments.r_version)
r_version=arguments.r_version,
enable_rubix=arguments.enable_rubix)
else:
cluster_info = ClusterInfo(arguments.label,
arguments.aws_access_key_id,
Expand Down
21 changes: 18 additions & 3 deletions qds_sdk/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,17 @@ def _parse_create_update(cls, args, action, api_version):
dest="ebs_volume_size",
type=int,
help="Size of each EBS volume, in GB",)
enable_rubix_group = hadoop_group.add_mutually_exclusive_group()
enable_rubix_group.add_argument("--enable-rubix",
dest="enable_rubix",
action="store_true",
default=None,
help="Enable rubix for cluster", )
enable_rubix_group.add_argument("--no-enable-rubix",
dest="enable_rubix",
action="store_false",
default=None,
help="Do not enable rubix for cluster", )

hadoop2 = hadoop_group.add_mutually_exclusive_group()
hadoop2.add_argument("--use-hadoop2",
Expand Down Expand Up @@ -1034,7 +1045,8 @@ def set_cluster_info(self, aws_access_key_id=None,
is_ha=None,
env_name=None,
python_version=None,
r_version=None):
r_version=None,
enable_rubix=None):
"""
Kwargs:
Expand Down Expand Up @@ -1159,6 +1171,7 @@ def set_cluster_info(self, aws_access_key_id=None,
`r_version`: Version of R for environment. (For Spark clusters)
`enable_rubix`: Enable rubix on the cluster (For Presto clusters)
"""

self.disallow_cluster_termination = disallow_cluster_termination
Expand All @@ -1169,7 +1182,7 @@ def set_cluster_info(self, aws_access_key_id=None,
node_base_cooldown_period, node_spot_cooldown_period, root_volume_size)
self.set_ec2_settings(aws_access_key_id, aws_secret_access_key, aws_region, aws_availability_zone, vpc_id, subnet_id,
master_elastic_ip, bastion_node_public_dns, role_instance_profile)
self.set_hadoop_settings(custom_config, use_hbase, use_hadoop2, use_spark, use_qubole_placement_policy, is_ha)
self.set_hadoop_settings(custom_config, use_hbase, use_hadoop2, use_spark, use_qubole_placement_policy, is_ha, enable_rubix)
self.set_spot_instance_settings(maximum_bid_price_percentage, timeout_for_request, maximum_spot_instance_percentage)
self.set_stable_spot_instance_settings(stable_maximum_bid_price_percentage, stable_timeout_for_request, stable_allow_fallback)
self.set_spot_block_settings(spot_block_duration)
Expand Down Expand Up @@ -1230,13 +1243,15 @@ def set_hadoop_settings(self, custom_config=None,
use_hadoop2=None,
use_spark=None,
use_qubole_placement_policy=None,
is_ha=None):
is_ha=None,
enable_rubix=None):
self.hadoop_settings['custom_config'] = custom_config
self.hadoop_settings['use_hbase'] = use_hbase
self.hadoop_settings['use_hadoop2'] = use_hadoop2
self.hadoop_settings['use_spark'] = use_spark
self.hadoop_settings['use_qubole_placement_policy'] = use_qubole_placement_policy
self.hadoop_settings['is_ha'] = is_ha
self.hadoop_settings['enable_rubix'] = enable_rubix

def set_spot_instance_settings(self, maximum_bid_price_percentage=None,
timeout_for_request=None,
Expand Down
24 changes: 20 additions & 4 deletions qds_sdk/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def set_engine_config(self,
dbtap_id=None,
fernet_key=None,
overrides=None,
is_ha=None):
is_ha=None,
enable_rubix=None):
'''
Args:
Expand Down Expand Up @@ -60,10 +61,11 @@ def set_engine_config(self,
is_ha: Enabling HA config for cluster
is_deeplearning : this is a deeplearning cluster config
enable_rubix: Enable rubix on the cluster
'''

self.set_hadoop_settings(custom_hadoop_config, use_qubole_placement_policy, is_ha, fairscheduler_config_xml, default_pool)
self.set_hadoop_settings(custom_hadoop_config, use_qubole_placement_policy, is_ha, fairscheduler_config_xml, default_pool, enable_rubix)
self.set_presto_settings(presto_version, custom_presto_config)
self.set_spark_settings(spark_version, custom_spark_config)
self.set_airflow_settings(dbtap_id, fernet_key, overrides)
Expand All @@ -81,11 +83,13 @@ def set_hadoop_settings(self,
use_qubole_placement_policy=None,
is_ha=None,
fairscheduler_config_xml=None,
default_pool=None):
default_pool=None,
enable_rubix=None):
self.hadoop_settings['custom_hadoop_config'] = custom_hadoop_config
self.hadoop_settings['use_qubole_placement_policy'] = use_qubole_placement_policy
self.hadoop_settings['is_ha'] = is_ha
self.set_fairscheduler_settings(fairscheduler_config_xml, default_pool)
self.hadoop_settings['enable_rubix'] = enable_rubix

def set_presto_settings(self,
presto_version=None,
Expand Down Expand Up @@ -123,7 +127,8 @@ def set_engine_config_settings(self, arguments):
custom_spark_config=arguments.custom_spark_config,
dbtap_id=arguments.dbtap_id,
fernet_key=arguments.fernet_key,
overrides=arguments.overrides)
overrides=arguments.overrides,
enable_rubix=arguments.enable_rubix)

@staticmethod
def engine_parser(argparser):
Expand Down Expand Up @@ -153,6 +158,17 @@ def engine_parser(argparser):
default=None,
help="Do not use Qubole Block Placement policy" +
" for clusters with spot nodes", )
enable_rubix_group = hadoop_settings_group.add_mutually_exclusive_group()
enable_rubix_group.add_argument("--enable-rubix",
dest="enable_rubix",
action="store_true",
default=None,
help="Enable rubix for cluster", )
enable_rubix_group.add_argument("--no-enable-rubix",
dest="enable_rubix",
action="store_false",
default=None,
help="Do not enable rubix for cluster", )

fairscheduler_group = argparser.add_argument_group(
"fairscheduler configuration options")
Expand Down
37 changes: 37 additions & 0 deletions tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,43 @@ def test_root_volume_size_invalid_v13(self):
qds.main()


def test_use_enable_rubix_v13(self):
sys.argv = ['qds.py', '--version', 'v1.3', 'cluster', 'create', '--label', 'test_label',
'--access-key-id', 'aki', '--secret-access-key', 'sak',
'--enable-rubix']
print_command()
Connection._api_call = Mock(return_value={})
qds.main()
Connection._api_call.assert_called_with('POST', 'clusters',
{'label': ['test_label'],
'ec2_settings': {'compute_secret_key': 'sak',
'compute_access_key': 'aki'},
'hadoop_settings': {'enable_rubix': True},
})

def test_no_use_enable_rubix_v13(self):
sys.argv = ['qds.py', '--version', 'v1.3', 'cluster', 'create', '--label', 'test_label',
'--access-key-id', 'aki', '--secret-access-key', 'sak',
'--no-enable-rubix']
print_command()
Connection._api_call = Mock(return_value={})
qds.main()
Connection._api_call.assert_called_with('POST', 'clusters',
{'label': ['test_label'],
'ec2_settings': {'compute_secret_key': 'sak',
'compute_access_key': 'aki'},
'hadoop_settings': {'enable_rubix': False},
})

@unittest.skipIf(sys.version_info < (2, 7, 0), "Known failure on Python 2.6")
def test_conflict_enable_rubix_v13(self):
sys.argv = ['qds.py', '--version', 'v1.3', 'cluster', 'create', '--label', 'test_label',
'--access-key-id', 'aki', '--secret-access-key', 'sak',
'--enable-rubix', '--no-enable-rubix']
print_command()
with self.assertRaises(SystemExit):
qds.main()

class TestClusterUpdate(QdsCliTestCase):
def test_minimal(self):
sys.argv = ['qds.py', 'cluster', 'update', '123']
Expand Down
14 changes: 9 additions & 5 deletions tests/test_clusterv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ def test_presto_engine_config(self):
temp.write("config.properties:\na=1\nb=2".encode("utf8"))
temp.flush()
sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label',
'--flavour', 'presto', '--presto-custom-config', temp.name]
'--flavour', 'presto', '--enable-rubix' , '--presto-custom-config', temp.name]
Qubole.cloud = None
print_command()
Connection._api_call = Mock(return_value={})
Expand All @@ -381,7 +381,10 @@ def test_presto_engine_config(self):
{'engine_config':
{'flavour': 'presto',
'presto_settings': {
'custom_presto_config': 'config.properties:\na=1\nb=2'}},
'custom_presto_config': 'config.properties:\na=1\nb=2'},
'hadoop_settings':{
'enable_rubix': True
}},
'cluster_info': {'label': ['test_label']}})

def test_spark_engine_config(self):
Expand Down Expand Up @@ -667,16 +670,17 @@ def test_engine_config(self):
temp.write("a=1\nb=2".encode("utf8"))
temp.flush()
sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'update', '123',
'--use-qubole-placement-policy', '--custom-hadoop-config',
temp.name]
'--use-qubole-placement-policy', '--enable-rubix',
'--custom-hadoop-config',temp.name]
Qubole.cloud = None
print_command()
Connection._api_call = Mock(return_value={})
qds.main()
Connection._api_call.assert_called_with('PUT', 'clusters/123', {'engine_config':
{'hadoop_settings':
{'use_qubole_placement_policy': True,
'custom_hadoop_config': 'a=1\nb=2'}}
'custom_hadoop_config': 'a=1\nb=2',
'enable_rubix': True}}
})

def test_cluster_info(self):
Expand Down

0 comments on commit bad7cd9

Please sign in to comment.