Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#713 Support autoscaling on EMR #2877

Merged
merged 5 commits into from
Feb 15, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 68 additions & 29 deletions aws/resource_aws_emr_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ func resourceAwsEMRCluster() *schema.Resource {
Optional: true,
Default: 0,
},
"autoscaling_policy": {
Type: schema.TypeString,
Optional: true,
DiffSuppressFunc: suppressEquivalentAwsPolicyDiffs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite the naming, suppressEquivalentAwsPolicyDiffs is currently only for IAM-type policy documents. In this case we'll likely be looking for this combination of attribute handlers:

DiffSuppressFunc: suppressEquivalentJsonDiffs,
ValidateFunc: validateJsonString,
StateFunc: func(v interface{}) string {
	json, _ := structure.NormalizeJsonString(v)
	return json
},

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make the change now

ValidateFunc: validateJsonString,
},
"instance_role": {
Type: schema.TypeString,
Required: true,
Expand Down Expand Up @@ -724,6 +730,13 @@ func flattenInstanceGroups(igs []*emr.InstanceGroup) []map[string]interface{} {
attrs["instance_count"] = *ig.RequestedInstanceCount
attrs["instance_role"] = *ig.InstanceGroupType
attrs["instance_type"] = *ig.InstanceType

if ig.AutoScalingPolicy != nil {
attrs["autoscaling_policy"] = *ig.AutoScalingPolicy
} else {
attrs["autoscaling_policy"] = ""
}

attrs["name"] = *ig.Name
result = append(result, attrs)
}
Expand Down Expand Up @@ -871,7 +884,7 @@ func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActi
}

func expandInstanceGroupConfigs(instanceGroupConfigs []interface{}) []*emr.InstanceGroupConfig {
configsOut := []*emr.InstanceGroupConfig{}
instanceGroupConfig := []*emr.InstanceGroupConfig{}

for _, raw := range instanceGroupConfigs {
configAttributes := raw.(map[string]interface{})
Expand All @@ -886,42 +899,68 @@ func expandInstanceGroupConfigs(instanceGroupConfigs []interface{}) []*emr.Insta
InstanceCount: aws.Int64(int64(configInstanceCount)),
}

if bidPrice, ok := configAttributes["bid_price"]; ok {
if bidPrice != "" {
config.BidPrice = aws.String(bidPrice.(string))
config.Market = aws.String("SPOT")
} else {
config.Market = aws.String("ON_DEMAND")
applyBidPrice(config, configAttributes)
applyEbsConfig(configAttributes, config)
applyAutoScalingPolicy(configAttributes, config)

instanceGroupConfig = append(instanceGroupConfig, config)
}

return instanceGroupConfig
}

func applyBidPrice(config *emr.InstanceGroupConfig, configAttributes map[string]interface{}) {
if bidPrice, ok := configAttributes["bid_price"]; ok {
if bidPrice != "" {
config.BidPrice = aws.String(bidPrice.(string))
config.Market = aws.String("SPOT")
} else {
config.Market = aws.String("ON_DEMAND")
}
}
}

func applyEbsConfig(configAttributes map[string]interface{}, config *emr.InstanceGroupConfig) {
if rawEbsConfigs, ok := configAttributes["ebs_config"]; ok {
ebsConfig := &emr.EbsConfiguration{}

ebsBlockDeviceConfigs := make([]*emr.EbsBlockDeviceConfig, 0)
for _, rawEbsConfig := range rawEbsConfigs.(*schema.Set).List() {
rawEbsConfig := rawEbsConfig.(map[string]interface{})
ebsBlockDeviceConfig := &emr.EbsBlockDeviceConfig{
VolumesPerInstance: aws.Int64(int64(rawEbsConfig["volumes_per_instance"].(int))),
VolumeSpecification: &emr.VolumeSpecification{
SizeInGB: aws.Int64(int64(rawEbsConfig["size"].(int))),
VolumeType: aws.String(rawEbsConfig["type"].(string)),
},
}
if v, ok := rawEbsConfig["iops"].(int); ok && v != 0 {
ebsBlockDeviceConfig.VolumeSpecification.Iops = aws.Int64(int64(v))
}
ebsBlockDeviceConfigs = append(ebsBlockDeviceConfigs, ebsBlockDeviceConfig)
}
ebsConfig.EbsBlockDeviceConfigs = ebsBlockDeviceConfigs

if rawEbsConfigs, ok := configAttributes["ebs_config"]; ok {
ebsConfig := &emr.EbsConfiguration{}
config.EbsConfiguration = ebsConfig
}
}

ebsBlockDeviceConfigs := make([]*emr.EbsBlockDeviceConfig, 0)
for _, rawEbsConfig := range rawEbsConfigs.(*schema.Set).List() {
rawEbsConfig := rawEbsConfig.(map[string]interface{})
ebsBlockDeviceConfig := &emr.EbsBlockDeviceConfig{
VolumesPerInstance: aws.Int64(int64(rawEbsConfig["volumes_per_instance"].(int))),
VolumeSpecification: &emr.VolumeSpecification{
SizeInGB: aws.Int64(int64(rawEbsConfig["size"].(int))),
VolumeType: aws.String(rawEbsConfig["type"].(string)),
},
}
if v, ok := rawEbsConfig["iops"].(int); ok && v != 0 {
ebsBlockDeviceConfig.VolumeSpecification.Iops = aws.Int64(int64(v))
}
ebsBlockDeviceConfigs = append(ebsBlockDeviceConfigs, ebsBlockDeviceConfig)
}
ebsConfig.EbsBlockDeviceConfigs = ebsBlockDeviceConfigs
func applyAutoScalingPolicy(configAttributes map[string]interface{}, config *emr.InstanceGroupConfig) {
if rawAutoScalingPolicy, ok := configAttributes["autoscaling_policy"]; ok {
autoScalingConfig, _ := expandAutoScalingPolicy(rawAutoScalingPolicy.(string))
config.AutoScalingPolicy = autoScalingConfig
}
}

config.EbsConfiguration = ebsConfig
}
func expandAutoScalingPolicy(rawDefinitions string) (*emr.AutoScalingPolicy, error) {
var policy *emr.AutoScalingPolicy

configsOut = append(configsOut, config)
err := json.Unmarshal([]byte(rawDefinitions), &policy)
if err != nil {
return nil, fmt.Errorf("Error decoding JSON: %s", err)
}

return configsOut
return policy, nil
}

func expandConfigures(input string) []*emr.Configuration {
Expand Down
33 changes: 33 additions & 0 deletions aws/resource_aws_emr_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,39 @@ resource "aws_emr_cluster" "tf-test-cluster" {
volumes_per_instance = 1
}
bid_price = "0.30"
autoscaling_policy = <<EOT
{
"Constraints": {
"MinCapacity": 1,
"MaxCapacity": 2
},
"Rules": [
{
"Name": "ScaleOutMemoryPercentage",
"Description": "Scale out if YARNMemoryAvailablePercentage is less than 15",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": 1,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"ComparisonOperator": "LESS_THAN",
"EvaluationPeriods": 1,
"MetricName": "YARNMemoryAvailablePercentage",
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"Statistic": "AVERAGE",
"Threshold": 15.0,
"Unit": "PERCENT"
}
}
}
]
}
EOT
},
{
instance_role = "MASTER"
Expand Down
46 changes: 45 additions & 1 deletion website/docs/r/emr_cluster.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,50 @@ resource "aws_emr_cluster" "emr-test-cluster" {
instance_profile = "${aws_iam_instance_profile.emr_profile.arn}"
}

instance_group {
instance_role = "CORE"
instance_type = "c4.large"
instance_count = "1"
ebs_config {
size = "40"
type = "gp2"
volumes_per_instance = 1
}
bid_price = "0.30"
autoscaling_policy = <<EOF
{
"Constraints": {
"MinCapacity": 1,
"MaxCapacity": 2
},
"Rules": [
{
"Name": "ScaleOutMemoryPercentage",
"Description": "Scale out if YARNMemoryAvailablePercentage is less than 15",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": 1,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"ComparisonOperator": "LESS_THAN",
"EvaluationPeriods": 1,
"MetricName": "YARNMemoryAvailablePercentage",
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"Statistic": "AVERAGE",
"Threshold": 15.0,
"Unit": "PERCENT"
}
}
}
]
}
EOF
}
ebs_root_volume_size = 100

master_instance_type = "m3.xlarge"
Expand Down Expand Up @@ -130,7 +174,7 @@ Attributes for each task instance group in the cluster
* `name` - (Optional) Friendly name given to the instance group
* `bid_price` - (Optional) If set, the bid price for each EC2 instance in the instance group, expressed in USD. By setting this attribute, the instance group is being declared as a Spot Instance, and will implicitly create a Spot request. Leave this blank to use On-Demand Instances. `bid_price` can not be set for the `MASTER` instance group, since that group must always be On-Demand
* `ebs_config` - (Optional) A list of attributes for the EBS volumes attached to each instance in the instance group. Each `ebs_config` defined will result in additional EBS volumes being attached to _each_ instance in the instance group. Defined below

* `autoscaling_policy` - (Optional) The autoscaling policy document. This is a JSON formatted string. See [EMR Auto Scaling](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-automatic-scaling.html)

## ebs_config

Expand Down