Skip to content

Commit

Permalink
CloudFormation: Support AWS::EMR::InstanceGroupConfig (#7446)
Browse files Browse the repository at this point in the history
  • Loading branch information
bblommers authored Mar 8, 2024
1 parent b13e493 commit cb60935
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 6 deletions.
51 changes: 47 additions & 4 deletions moto/emr/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__(
self.instance_fleet_id = instance_fleet_id


class FakeInstanceGroup(BaseModel):
class FakeInstanceGroup(CloudFormationModel):
def __init__(
self,
cluster_id: str,
Expand Down Expand Up @@ -82,7 +82,7 @@ def __init__(
self.name = name
self.num_instances = instance_count
self.role = instance_role
self.type = instance_type
self.instance_type = instance_type
self.ebs_configuration = ebs_configuration
self.auto_scaling_policy = auto_scaling_policy
self.creation_datetime = datetime.now(timezone.utc)
Expand Down Expand Up @@ -122,6 +122,45 @@ def auto_scaling_policy(self, value: Any) -> None:
):
dimension["value"] = self.cluster_id

@property
def physical_resource_id(self) -> str:
return self.id

@staticmethod
def cloudformation_type() -> str:
return "AWS::EMR::InstanceGroupConfig"

@classmethod
def create_from_cloudformation_json( # type: ignore[misc]
cls,
resource_name: str,
cloudformation_json: Any,
account_id: str,
region_name: str,
**kwargs: Any,
) -> "FakeInstanceGroup":

properties = cloudformation_json["Properties"]
job_flow_id = properties["JobFlowId"]
ebs_config = properties.get("EbsConfiguration")
if ebs_config:
ebs_config = CamelToUnderscoresWalker.parse_dict(ebs_config)
props = {
"instance_count": properties.get("InstanceCount"),
"instance_role": properties.get("InstanceRole"),
"instance_type": properties.get("InstanceType"),
"market": properties.get("Market"),
"bid_price": properties.get("BidPrice"),
"name": properties.get("Name"),
"auto_scaling_policy": properties.get("AutoScalingPolicy"),
"ebs_configuration": ebs_config,
}

emr_backend: ElasticMapReduceBackend = emr_backends[account_id][region_name]
return emr_backend.add_instance_groups(
cluster_id=job_flow_id, instance_groups=[props]
)[0]


class FakeStep(BaseModel):
def __init__(
Expand Down Expand Up @@ -292,11 +331,15 @@ def instance_groups(self) -> List[FakeInstanceGroup]:

@property
def master_instance_type(self) -> str:
return self.emr_backend.instance_groups[self.master_instance_group_id].type # type: ignore
return self.emr_backend.instance_groups[
self.master_instance_group_id # type: ignore
].instance_type

@property
def slave_instance_type(self) -> str:
return self.emr_backend.instance_groups[self.core_instance_group_id].type # type: ignore
return self.emr_backend.instance_groups[
self.core_instance_group_id # type: ignore
].instance_type

@property
def instance_count(self) -> int:
Expand Down
4 changes: 2 additions & 2 deletions moto/emr/responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ def remove_auto_scaling_policy(self) -> str:
<InstanceRequestCount>{{ instance_group.num_instances }}</InstanceRequestCount>
<InstanceRole>{{ instance_group.role }}</InstanceRole>
<InstanceRunningCount>{{ instance_group.num_instances }}</InstanceRunningCount>
<InstanceType>{{ instance_group.type }}</InstanceType>
<InstanceType>{{ instance_group.instance_type }}</InstanceType>
<LastStateChangeReason/>
<Market>{{ instance_group.market }}</Market>
<Name>{{ instance_group.name }}</Name>
Expand Down Expand Up @@ -1084,7 +1084,7 @@ def remove_auto_scaling_policy(self) -> str:
{% endif %}
<Id>{{ instance_group.id }}</Id>
<InstanceGroupType>{{ instance_group.role }}</InstanceGroupType>
<InstanceType>{{ instance_group.type }}</InstanceType>
<InstanceType>{{ instance_group.instance_type }}</InstanceType>
<Market>{{ instance_group.market }}</Market>
<Name>{{ instance_group.name }}</Name>
<RequestedInstanceCount>{{ instance_group.num_instances }}</RequestedInstanceCount>
Expand Down
192 changes: 192 additions & 0 deletions tests/test_emr/test_emr_cloudformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,3 +632,195 @@ def test_create_cluster_with_kerberos_attrs():
emr.describe_security_configuration(Name="mysecconfig")
err = exc.value.response["Error"]
assert err["Code"] == "InvalidRequestException"


template_with_simple_instance_group_config = {
"Resources": {
"Cluster1": {
"Type": "AWS::EMR::Cluster",
"Properties": {
"Instances": {
"CoreInstanceGroup": {
"InstanceCount": 3,
"InstanceType": "m3g",
}
},
"JobFlowRole": "EMR_EC2_DefaultRole",
"Name": "my cluster",
"ServiceRole": "EMR_DefaultRole",
},
},
"TestInstanceGroupConfig": {
"Type": "AWS::EMR::InstanceGroupConfig",
"Properties": {
"InstanceCount": 2,
"InstanceType": "m3.xlarge",
"InstanceRole": "TASK",
"Market": "ON_DEMAND",
"Name": "cfnTask2",
"JobFlowId": {"Ref": "Cluster1"},
},
},
},
}


@mock_aws
def test_create_simple_instance_group():
region = "us-east-1"
cf = boto3.client("cloudformation", region_name=region)
emr = boto3.client("emr", region_name=region)
cf.create_stack(
StackName="teststack",
TemplateBody=json.dumps(template_with_simple_instance_group_config),
)

# Verify resources
res = cf.describe_stack_resources(StackName="teststack")["StackResources"][0]
cluster_id = res["PhysicalResourceId"]

ig = emr.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"][0]
assert ig["Name"] == "cfnTask2"
assert ig["Market"] == "ON_DEMAND"
assert ig["InstanceGroupType"] == "TASK"
assert ig["InstanceType"] == "m3.xlarge"


template_with_advanced_instance_group_config = {
"Resources": {
"Cluster1": {
"Type": "AWS::EMR::Cluster",
"Properties": {
"Instances": {
"CoreInstanceGroup": {
"InstanceCount": 3,
"InstanceType": "m3g",
}
},
"JobFlowRole": "EMR_EC2_DefaultRole",
"Name": "my cluster",
"ServiceRole": "EMR_DefaultRole",
},
},
"TestInstanceGroupConfig": {
"Type": "AWS::EMR::InstanceGroupConfig",
"Properties": {
"InstanceCount": 1,
"InstanceType": "m4.large",
"InstanceRole": "TASK",
"Market": "ON_DEMAND",
"Name": "cfnTask3",
"JobFlowId": {"Ref": "Cluster1"},
"EbsConfiguration": {
"EbsOptimized": True,
"EbsBlockDeviceConfigs": [
{
"VolumesPerInstance": 2,
"VolumeSpecification": {
"Iops": 10,
"SizeInGB": 50,
"Throughput": 100,
"VolumeType": "gp3",
},
}
],
},
"AutoScalingPolicy": {
"Constraints": {"MinCapacity": 1, "MaxCapacity": 4},
"Rules": [
{
"Name": "Scale-out",
"Description": "Scale-out policy",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": 1,
"CoolDown": 300,
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"Dimensions": [
{
"Key": "JobFlowId",
"Value": "${emr.clusterId}",
}
],
"EvaluationPeriods": 1,
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"ComparisonOperator": "LESS_THAN",
"Statistic": "AVERAGE",
"Threshold": 15,
"Unit": "PERCENT",
"MetricName": "YARNMemoryAvailablePercentage",
}
},
},
{
"Name": "Scale-in",
"Description": "Scale-in policy",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": -1,
"CoolDown": 300,
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"Dimensions": [
{
"Key": "JobFlowId",
"Value": "${emr.clusterId}",
}
],
"EvaluationPeriods": 1,
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"ComparisonOperator": "GREATER_THAN",
"Statistic": "AVERAGE",
"Threshold": 75,
"Unit": "PERCENT",
"MetricName": "YARNMemoryAvailablePercentage",
}
},
},
],
},
},
},
},
}


@mock_aws
def test_create_advanced_instance_group():
region = "us-east-1"
cf = boto3.client("cloudformation", region_name=region)
emr = boto3.client("emr", region_name=region)
cf.create_stack(
StackName="teststack",
TemplateBody=json.dumps(template_with_advanced_instance_group_config),
)

# Verify resources
res = cf.describe_stack_resources(StackName="teststack")["StackResources"][0]
cluster_id = res["PhysicalResourceId"]

ig = emr.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"][0]
assert ig["Name"] == "cfnTask3"
assert ig["Market"] == "ON_DEMAND"
assert ig["InstanceGroupType"] == "TASK"
assert ig["InstanceType"] == "m4.large"

as_policy = ig["AutoScalingPolicy"]
assert as_policy["Status"] == {"State": "ATTACHED"}
assert as_policy["Constraints"] == {"MinCapacity": 1, "MaxCapacity": 4}

ebs = ig["EbsBlockDevices"]
assert ebs[0]["VolumeSpecification"] == {
"VolumeType": "gp3",
"Iops": 10,
"SizeInGB": 50,
}

0 comments on commit cb60935

Please sign in to comment.