diff --git a/moto/emr/models.py b/moto/emr/models.py index c73923919e70..e26c03d8fa6b 100644 --- a/moto/emr/models.py +++ b/moto/emr/models.py @@ -53,7 +53,7 @@ def __init__( self.instance_fleet_id = instance_fleet_id -class FakeInstanceGroup(BaseModel): +class FakeInstanceGroup(CloudFormationModel): def __init__( self, cluster_id: str, @@ -82,7 +82,7 @@ def __init__( self.name = name self.num_instances = instance_count self.role = instance_role - self.type = instance_type + self.instance_type = instance_type self.ebs_configuration = ebs_configuration self.auto_scaling_policy = auto_scaling_policy self.creation_datetime = datetime.now(timezone.utc) @@ -122,6 +122,45 @@ def auto_scaling_policy(self, value: Any) -> None: ): dimension["value"] = self.cluster_id + @property + def physical_resource_id(self) -> str: + return self.id + + @staticmethod + def cloudformation_type() -> str: + return "AWS::EMR::InstanceGroupConfig" + + @classmethod + def create_from_cloudformation_json( # type: ignore[misc] + cls, + resource_name: str, + cloudformation_json: Any, + account_id: str, + region_name: str, + **kwargs: Any, + ) -> "FakeInstanceGroup": + + properties = cloudformation_json["Properties"] + job_flow_id = properties["JobFlowId"] + ebs_config = properties.get("EbsConfiguration") + if ebs_config: + ebs_config = CamelToUnderscoresWalker.parse_dict(ebs_config) + props = { + "instance_count": properties.get("InstanceCount"), + "instance_role": properties.get("InstanceRole"), + "instance_type": properties.get("InstanceType"), + "market": properties.get("Market"), + "bid_price": properties.get("BidPrice"), + "name": properties.get("Name"), + "auto_scaling_policy": properties.get("AutoScalingPolicy"), + "ebs_configuration": ebs_config, + } + + emr_backend: ElasticMapReduceBackend = emr_backends[account_id][region_name] + return emr_backend.add_instance_groups( + cluster_id=job_flow_id, instance_groups=[props] + )[0] + class FakeStep(BaseModel): def __init__( @@ -292,11 +331,15 @@ def instance_groups(self) -> List[FakeInstanceGroup]: @property def master_instance_type(self) -> str: - return self.emr_backend.instance_groups[self.master_instance_group_id].type # type: ignore + return self.emr_backend.instance_groups[ + self.master_instance_group_id # type: ignore + ].instance_type @property def slave_instance_type(self) -> str: - return self.emr_backend.instance_groups[self.core_instance_group_id].type # type: ignore + return self.emr_backend.instance_groups[ + self.core_instance_group_id # type: ignore + ].instance_type @property def instance_count(self) -> int: diff --git a/moto/emr/responses.py b/moto/emr/responses.py index 76df2920323a..39f66e9cc200 100644 --- a/moto/emr/responses.py +++ b/moto/emr/responses.py @@ -758,7 +758,7 @@ def remove_auto_scaling_policy(self) -> str: {{ instance_group.num_instances }} {{ instance_group.role }} {{ instance_group.num_instances }} - {{ instance_group.type }} + {{ instance_group.instance_type }} {{ instance_group.market }} {{ instance_group.name }} @@ -1084,7 +1084,7 @@ def remove_auto_scaling_policy(self) -> str: {% endif %} {{ instance_group.id }} {{ instance_group.role }} - {{ instance_group.type }} + {{ instance_group.instance_type }} {{ instance_group.market }} {{ instance_group.name }} {{ instance_group.num_instances }} diff --git a/tests/test_emr/test_emr_cloudformation.py b/tests/test_emr/test_emr_cloudformation.py index 84bef4b25e4c..b9c2751d56a8 100644 --- a/tests/test_emr/test_emr_cloudformation.py +++ b/tests/test_emr/test_emr_cloudformation.py @@ -632,3 +632,195 @@ def test_create_cluster_with_kerberos_attrs(): emr.describe_security_configuration(Name="mysecconfig") err = exc.value.response["Error"] assert err["Code"] == "InvalidRequestException" + + +template_with_simple_instance_group_config = { + "Resources": { + "Cluster1": { + "Type": "AWS::EMR::Cluster", + "Properties": { + "Instances": { + "CoreInstanceGroup": { + "InstanceCount": 3, + "InstanceType": "m3g", + } + }, + "JobFlowRole": "EMR_EC2_DefaultRole", + "Name": "my cluster", + "ServiceRole": "EMR_DefaultRole", + }, + }, + "TestInstanceGroupConfig": { + "Type": "AWS::EMR::InstanceGroupConfig", + "Properties": { + "InstanceCount": 2, + "InstanceType": "m3.xlarge", + "InstanceRole": "TASK", + "Market": "ON_DEMAND", + "Name": "cfnTask2", + "JobFlowId": {"Ref": "Cluster1"}, + }, + }, + }, +} + + +@mock_aws +def test_create_simple_instance_group(): + region = "us-east-1" + cf = boto3.client("cloudformation", region_name=region) + emr = boto3.client("emr", region_name=region) + cf.create_stack( + StackName="teststack", + TemplateBody=json.dumps(template_with_simple_instance_group_config), + ) + + # Verify resources + res = cf.describe_stack_resources(StackName="teststack")["StackResources"][0] + cluster_id = res["PhysicalResourceId"] + + ig = emr.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"][0] + assert ig["Name"] == "cfnTask2" + assert ig["Market"] == "ON_DEMAND" + assert ig["InstanceGroupType"] == "TASK" + assert ig["InstanceType"] == "m3.xlarge" + + +template_with_advanced_instance_group_config = { + "Resources": { + "Cluster1": { + "Type": "AWS::EMR::Cluster", + "Properties": { + "Instances": { + "CoreInstanceGroup": { + "InstanceCount": 3, + "InstanceType": "m3g", + } + }, + "JobFlowRole": "EMR_EC2_DefaultRole", + "Name": "my cluster", + "ServiceRole": "EMR_DefaultRole", + }, + }, + "TestInstanceGroupConfig": { + "Type": "AWS::EMR::InstanceGroupConfig", + "Properties": { + "InstanceCount": 1, + "InstanceType": "m4.large", + "InstanceRole": "TASK", + "Market": "ON_DEMAND", + "Name": "cfnTask3", + "JobFlowId": {"Ref": "Cluster1"}, + "EbsConfiguration": { + "EbsOptimized": True, + "EbsBlockDeviceConfigs": [ + { + "VolumesPerInstance": 2, + "VolumeSpecification": { + "Iops": 10, + "SizeInGB": 50, + "Throughput": 100, + "VolumeType": "gp3", + }, + } + ], + }, + "AutoScalingPolicy": { + "Constraints": {"MinCapacity": 1, "MaxCapacity": 4}, + "Rules": [ + { + "Name": "Scale-out", + "Description": "Scale-out policy", + "Action": { + "SimpleScalingPolicyConfiguration": { + "AdjustmentType": "CHANGE_IN_CAPACITY", + "ScalingAdjustment": 1, + "CoolDown": 300, + } + }, + "Trigger": { + "CloudWatchAlarmDefinition": { + "Dimensions": [ + { + "Key": "JobFlowId", + "Value": "${emr.clusterId}", + } + ], + "EvaluationPeriods": 1, + "Namespace": "AWS/ElasticMapReduce", + "Period": 300, + "ComparisonOperator": "LESS_THAN", + "Statistic": "AVERAGE", + "Threshold": 15, + "Unit": "PERCENT", + "MetricName": "YARNMemoryAvailablePercentage", + } + }, + }, + { + "Name": "Scale-in", + "Description": "Scale-in policy", + "Action": { + "SimpleScalingPolicyConfiguration": { + "AdjustmentType": "CHANGE_IN_CAPACITY", + "ScalingAdjustment": -1, + "CoolDown": 300, + } + }, + "Trigger": { + "CloudWatchAlarmDefinition": { + "Dimensions": [ + { + "Key": "JobFlowId", + "Value": "${emr.clusterId}", + } + ], + "EvaluationPeriods": 1, + "Namespace": "AWS/ElasticMapReduce", + "Period": 300, + "ComparisonOperator": "GREATER_THAN", + "Statistic": "AVERAGE", + "Threshold": 75, + "Unit": "PERCENT", + "MetricName": "YARNMemoryAvailablePercentage", + } + }, + }, + ], + }, + }, + }, + }, +} + + +@mock_aws +def test_create_advanced_instance_group(): + region = "us-east-1" + cf = boto3.client("cloudformation", region_name=region) + emr = boto3.client("emr", region_name=region) + cf.create_stack( + StackName="teststack", + TemplateBody=json.dumps(template_with_advanced_instance_group_config), + ) + + # Verify resources + res = cf.describe_stack_resources(StackName="teststack")["StackResources"][0] + cluster_id = res["PhysicalResourceId"] + + ig = emr.list_instance_groups(ClusterId=cluster_id)["InstanceGroups"][0] + assert ig["Name"] == "cfnTask3" + assert ig["Market"] == "ON_DEMAND" + assert ig["InstanceGroupType"] == "TASK" + assert ig["InstanceType"] == "m4.large" + + as_policy = ig["AutoScalingPolicy"] + assert as_policy["Status"] == {"State": "ATTACHED"} + assert as_policy["Constraints"] == {"MinCapacity": 1, "MaxCapacity": 4} + + ebs = ig["EbsBlockDevices"] + assert ebs[0]["VolumeSpecification"] == { + "VolumeType": "gp3", + "Iops": 10, + "SizeInGB": 50, + }