diff --git a/content/running_spark_apps_with_emr_on_spot_instances/emr_instance_fleets.md b/content/running_spark_apps_with_emr_on_spot_instances/emr_instance_fleets.md index 6ac74289..8b34bf4e 100644 --- a/content/running_spark_apps_with_emr_on_spot_instances/emr_instance_fleets.md +++ b/content/running_spark_apps_with_emr_on_spot_instances/emr_instance_fleets.md @@ -2,26 +2,20 @@ title: "EMR Instance Fleets" weight: 30 --- +The **[instance fleet](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html)** configuration for Amazon EMR clusters lets you select a wide variety of provisioning options for Amazon EC2 instances, and helps you develop a flexible and elastic resourcing strategy for each node type in your cluster. You can have only one instance fleet per master, core, and task node type. In an instance fleet configuration, you specify a *target capacity* for **[On-Demand Instances](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-on-demand-instances.html)** and **[Spot Instances](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-spot-instances.html)** within each fleet. -When adopting Spot Instances into your workload, it is recommended to be flexible around how to launch your workload in terms of Availability Zone and Instance Types. This is in order to be able to achieve the required scale from multiple Spot capacity pools (a combination of EC2 instance type in an availability zone) or one capacity pool which has sufficient capacity, as well as decrease the impact on your workload in case some of the Spot capacity is interrupted with a 2-minute notice when EC2 needs the capacity back, and allow EMR to replenish the capacity with a different instance type. +When adopting Spot Instances into your workload, it is recommended to be flexible with selection of instance types across family, generation, sizes, and Availability Zones. With higher instance type diversification, Amazon EMR has more capacity pools to allocate capacity from, and chooses the Spot Instances which are least likely to be interrupted. -With EMR instance fleets, you specify target capacities for On-Demand Instances and Spot Instances within each fleet (Master, Core, Task). When the cluster launches, Amazon EMR provisions instances until the targets are fulfilled. You can specify up to five EC2 instance types per fleet for Amazon EMR to use when fulfilling the targets. You can also select multiple subnets for different Availability Zones. - -{{% notice info %}} -**[Click here](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html)** to learn more about EMR Instance Fleets in the official documentation. +{{% notice note %}} +EMR allows a maximum of five instance types per fleet, when you use use the default Amazon EMR cluster instance fleet configuration. However, you can specify a maximum of 15 instance types per fleet when you create a cluster using the EMR console and enable **[allocation strategy](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-instance-fleet.html#emr-instance-fleet-allocation-strategy)** option. This limit is further increased to 30 when you create a cluster using AWS CLI or Amazon EMR API and enable allocation strategy option. {{% /notice %}} -While a cluster is running, if Amazon EC2 reclaims a Spot Instance or if an instance fails, Amazon EMR tries to replace the instance with any of the instance types that you specify in your fleet. This makes it easier to regain capacity in case some of the instances get interrupted by EC2 when it needs the Spot capacity back. +As an enhancement to the default EMR instance fleets cluster configuration, the allocation strategy feature is available in EMR version **5.12.1 and later**. With allocation strategy EMR instance fleet with: -These options do not exist within the default EMR configuration option "Uniform Instance Groups", hence we will be using EMR Instance Fleets only. +* On-Demand Instances uses a lowest-priced strategy, which launches the lowest-priced On-Demand Instances first. +* Spot Instances uses a **[capacity-optimized](https://aws.amazon.com/about-aws/whats-new/2020/06/amazon-emr-uses-real-time-capacity-insights-to-provision-spot-instances-to-lower-cost-and-interruption/)** allocation strategy, which allocates instances from most-available Spot Instance pools and lowers the chance of further interruptions. This allocation strategy is appropriate for workloads that have a higher cost of interruption such as persistent EMR clusters running Apache Spark, Apache Hive, and Presto. -As an enhancement to the default EMR instance fleets cluster configuration, the allocation strategy feature is available in EMR version **5.12.1 and later**. With allocation strategy: -* On-Demand instances use a lowest-price strategy, which launches the lowest-priced instances first. -* Spot instances use a **[capacity-optimized](https://aws.amazon.com/about-aws/whats-new/2020/06/amazon-emr-uses-real-time-capacity-insights-to-provision-spot-instances-to-lower-cost-and-interruption/)** allocation strategy, which allocates instances from most-available Spot Instance pools and lowers the chance of interruptions. This allocation strategy is appropriate for workloads that have a higher cost of interruption such as persistent EMR clusters running Apache Spark, Apache Hive, and Presto. - -{{% notice note %}} -This allocation strategy option also lets you specify **up to 15 EC2 instance types on task instance fleet**. By default, Amazon EMR allows a maximum of 5 instance types for each type of instance fleet. By enabling allocation strategy, you can diversify your Spot request for task instance fleet across 15 instance pools. With more instance type diversification, Amazon EMR has more capacity pools to allocate capacity from, this allows you to get more compute capacity. -{{% /notice %}} +These options do not exist within the default EMR configuration option uniform instance groups, hence we recommend using instance fleets with Spot Instances. {{% notice info %}} **[Click here](https://aws.amazon.com/blogs/big-data/optimizing-amazon-emr-for-resilience-and-cost-with-capacity-optimized-spot-instances/)** for an in-depth blog post about capacity-optimized allocation strategy for Amazon EMR instance fleets. diff --git a/content/running_spark_apps_with_emr_on_spot_instances/examining_cluster.md b/content/running_spark_apps_with_emr_on_spot_instances/examining_cluster.md index b3c782e0..96a5be75 100644 --- a/content/running_spark_apps_with_emr_on_spot_instances/examining_cluster.md +++ b/content/running_spark_apps_with_emr_on_spot_instances/examining_cluster.md @@ -1,6 +1,6 @@ --- title: "Examining the cluster" -weight: 95 +weight: 90 --- In this section we will look at the utilization of our EC2 Spot Instances while the application is running, and examine how many Spark executors are running. @@ -78,21 +78,30 @@ Now, let's look at **Spark History Server** application user interface: ### Using CloudWatch Metrics EMR emits several useful metrics to CloudWatch metrics. You can use the AWS Management Console to look at the metrics in two ways: -1. In the EMR console, under the Monitoring tab in your Cluster's page +1. In the EMR console, under the **Monitoring** tab in your cluster's page 2. By browsing to the CloudWatch service, and under Metrics, searching for the name of your cluster (copy it from the EMR Management Console) and clicking **EMR > Job Flow Metrics** {{% notice note %}} The metrics will take a few minutes to populate. {{% /notice %}} -Some notable metrics: +Some notable metrics: + * **AppsRunning** - you should see 1 since we only submitted one step to the cluster. -* **ContainerAllocated** - this represents the number of containers that are running on your cluster, on the Core and Task Instance Fleets. These would the be Spark executors and the Spark Driver. -* **MemoryAllocatedMB** & **MemoryAvailableMB** - you can graph them both to see how much memory the cluster is actually consuming for the wordcount Spark application out of the memory that the instances have. +* **ContainerAllocated** - this represents the number of containers that are running on core and task fleets. These would the be Spark executors and the Spark Driver. +* **Memory allocated MB** & **Memory available MB** - you can graph them both to see how much memory the cluster is actually consuming for the wordcount Spark application out of the memory that the instances have. + +#### Managed Scaling in Action + +You enabled managed cluster scaling and EMR scaled out to 64 Spot units in the task fleet. EMR could have launched either 16 * xlarge (running one executor per xlarge) or 8 * 2xlarge instances (running 2 executors per 2xlarge) or 4 * 4xlarge instances (running 4 executors pe r4xlarge), so the task fleet provides 16 executors / containers to the cluster. The core fleet launched one xlarge instance and it will run one executor / container, so in total 17 executors / containers will be running in the cluster. + + +1. In your EMR cluster page, in the AWS Management Console, go to the **Steps** tab. +1. Go to the **Events** tab to see the scaling events. +![scalingEvent](/images/running-emr-spark-apps-on-spot/emrsparkscalingevent.png) + +EMR Managed cluster scaling constantly monitors [key metrics](https://docs.aws.amazon.com/emr/latest/ManagementGuide/managed-scaling-metrics.html) and automatically increases or decreases the number of instances or units in your cluster based on workload. -#### Number of executors in the cluster -With 32 Spot Units in the Task Instance Fleet, EMR launched either 8 * xlarge (running one executor) or 4 * 2xlarge instances (running 2 executors) or 2 * 4xlarge instances (running 4 executors), so the Task Instance Fleet provides 8 executors / containers to the cluster. -The Core Instance Fleet launched one xlarge instance, able to run one executor. -{{%expand "Question: Did you see more than 9 containers in CloudWatch Metrics and in YARN ResourceManager? if so, do you know why? Click to expand the answer" %}} +{{%expand "Question: Did you see more than 17 containers in CloudWatch Metrics and in YARN ResourceManager? if so, do you know why? Click to expand the answer" %}} Your Spark application was configured to run in Cluster mode, meaning that the **Spark driver is running on the Core node**. Since it is counted as a container, this adds a container to our count, but it is not an executor. -{{% /expand%}} +{{% /expand%}} \ No newline at end of file diff --git a/content/running_spark_apps_with_emr_on_spot_instances/fleet_config_options.md b/content/running_spark_apps_with_emr_on_spot_instances/fleet_config_options.md index 1aaf4670..38698e0d 100644 --- a/content/running_spark_apps_with_emr_on_spot_instances/fleet_config_options.md +++ b/content/running_spark_apps_with_emr_on_spot_instances/fleet_config_options.md @@ -3,16 +3,17 @@ title: "Fleet configuration options" weight: 85 --- -While our cluster is starting (7-8 minutes) and the step is running (4-10 minutes depending on the instance types that were selected) let's take the time to look at some of the EMR Instance Fleets configurations we didn't dive into when starting the cluster. +While our cluster is starting (7-8 minutes) and the step is running (4-10 minutes depending on the instance types that were selected) let's take the time to look at some of the EMR instance fleets configurations we didn't dive into when starting the cluster. -![fleetconfigs](/images/running-emr-spark-apps-on-spot/emrinstancefleets-core1.png) +![fleetconfigs](/images/running-emr-spark-apps-on-spot/emrinstancefleets-core.png) #### Maximum Spot price -Since Nov 2017, Amazon EC2 Spot Instances [changed the pricing model and bidding was eliminated](https://aws.amazon.com/blogs/compute/new-amazon-ec2-spot-pricing/). We have an optional "Max-price" field for our Spot requests, which would limit how much we're willing to pay for the instance. It is recommended to leave this value at 100% of the On-Demand price, in order to avoid limiting our instance diversification. We are going to pay the Spot market price regardless of the Maximum price that we can specify, and setting a higher max price does not increase the chance of getting Spot capacity nor does it decrease the chance of getting your Spot Instances interrupted when EC2 needs the capacity back. You can see the current Spot price in the AWS Management Console under EC2 -> Spot Requests -> **Pricing History**. +Since Nov 2017, Amazon EC2 Spot Instances [changed the pricing model and bidding was eliminated](https://aws.amazon.com/blogs/compute/new-amazon-ec2-spot-pricing/). We have an optional "Max-price" field for our Spot requests, which would limit how much we're willing to pay for the instance. It is recommended to leave this value at 100% of the On-Demand price, in order to avoid limiting our instance diversification. We are going to pay the Spot market price regardless of the Maximum price that we can specify, and setting a higher max price does not increase the chance of getting Spot capacity nor does it decrease the chance of getting your Spot Instances interrupted when EC2 needs the capacity back. You can see the current Spot price in the AWS Management Console under **EC2 >> Spot Requests >> Pricing History**. #### Each instance counts as X units -This configuration allows us to give each instance type in our diversified fleet a weight that will count towards our Total units. By default, this weight is configured as the number of YARN VCores that the instance type has by default (this would typically equate to the number of EC2 vCPUs) - this way it's easy to set the Total units to the number of vCPUs we want our cluster to run with, and EMR will select the best instances while taking into account the required number of instances to run. For example, if r4.xlarge is the instance type that EMR found to be the least likely to be interrupted, its weight is 4 and our total units (only Spot) is 32, then 8 * r4.xlarge instances will be launched by EMR in the fleet. -If my Spark application is memory driven, I can set the total units to the total amount of memory I want my cluster to run with, and change the "Each instance counts as" field to the total memory of the instance, leaving aside some memory for the operating system and other processes. For example, for the r4.xlarge I can set its weight to 25. If I then set up the Total units to 500 then EMR will bring up 20 * r4.xlarge instances in the fleet. Since our executor size is 18 GB, one executor will run on this instance type. + This configuration allows us to give each instance type in our diversified fleet a weight that will count towards our **Target capacity**. By default, this weight is configured as the number of YARN VCores that the instance type has by default, this would typically equate to the number of EC2 vCPUs. For example, r4.xlarge has a default weight set to 4 units and you have set the **Target capacity** for task fleet to 32. If EMR picks r4.xlarge as the most available Spot Instance, then 8 * r4.xlarge instances will be launched by EMR in the task fleet. + +If your Spark application is memory driven, you can set the **Target capacity** to the total amount of memory you want the cluster to run with. You can change the **Each instance counts as** field to the total memory of the instance, leaving aside some memory for the operating system and other processes. For example, for the r4.xlarge you can set **Each instance counts as** 18 units and set **Target capacity** to 144. If EMR picks r4.xlarge as the most available Spot Instance, then 8 * r4.xlarge instances will be launched by EMR in the task fleet. Since your executor size is 18 GB, one executor will run on each r4.xlarge instance. #### Provisioning timeout You can determine that after a set amount of minutes, if EMR is unable to provision your selected Spot Instances due to lack of capacity, it will either start On-Demand instances instead, or terminate the cluster. This can be determined according to the business definition of the cluster or Spark application - if it is SLA bound and should complete even at On-Demand price, then the "Switch to On-Demand" option might be suitable. However, make sure you diversify the instance types in the fleet when looking to use Spot Instances, before you look into failing over to On-Demand. \ No newline at end of file diff --git a/content/running_spark_apps_with_emr_on_spot_instances/launching_emr_cluster-1.md b/content/running_spark_apps_with_emr_on_spot_instances/launching_emr_cluster-1.md index ae3d53ff..0cc5c2ad 100644 --- a/content/running_spark_apps_with_emr_on_spot_instances/launching_emr_cluster-1.md +++ b/content/running_spark_apps_with_emr_on_spot_instances/launching_emr_cluster-1.md @@ -3,48 +3,58 @@ title: "Launch a cluster - Step 1" weight: 60 --- -In this step we'll launch our first cluster, which will run solely on Spot Instances. We will also submit an EMR step for a simple wordcount Spark application which will run against a public dataset of Amazon product reviews, located in an Amazon S3 bucket in the N. Virginia region. If you want to know more about the Amazon Customer Reviews Dataset, [click here] (https://s3.amazonaws.com/amazon-reviews-pds/readme.html) +In this step first you will launch EMR cluster that runs on a mix of On-Demand and Spot Instances. Next, you will add step that runs a simple wordcount Spark application against a public dataset of Amazon product reviews located in an Amazon S3 bucket in the N. Virginia region. + {{% notice note %}} -Normally our dataset on S3 would be located in the same region where we are going to run our EMR clusters. In this workshop, it is fine if you are running EMR in a different region, and the Spark application will work against the dataset which is located in the N. Virginia region. This will be negligible in terms of price and performance. +Ideally your dataset on S3 would be located in the same region where you run EMR clusters. In this workshop you might run EMR in a different region than N. Virginia, however Spark application will work without any issues with the dataset located in S3 bucket in the N. Virginia region. If you want to know more about the Amazon Customer Reviews Dataset, [click here] (https://s3.amazonaws.com/amazon-reviews-pds/readme.html) {{% /notice %}} To launch the cluster, follow these steps: 1. [Open the EMR console](https://console.aws.amazon.com/elasticmapreduce/home) in the region where you are looking to launch your cluster. -1. Click "**Create Cluster**" +1. Click "**Create cluster**" 1. Click "**Go to advanced options**" -1. Select the latest EMR 5.x.x release (the console will default to it), and in the list of components, only leave **Hadoop** checked and also check **Spark** and **Ganglia** (we will use it later to monitor our cluster) -1. Under "**Steps (Optional)**" -> Step type drop down menu, select "**Spark application**" and click **Add step**, then add the following details in the Add step dialog window: +1. Select the latest EMR 5.x.x release (the console will default to it), and in the list of components, only select **Hadoop**, **Spark** and **Ganglia**. You will use Ganglia to monitor our cluster resource utilization. +1. Skip to "**Steps (Optional)**" section and add a step as per below instructions: -* **Spark-submit options**: here we will configure the memory and core count for each executor, as described in the previous section. Use these settings (make sure you have two '-' chars): -``` ---executor-memory 18G --executor-cores 4 -``` -* **Application location**: here we will configure the location of our Spark application. Save the following python code to a file (or download it from the Attachment box) and upload it to your S3 bucket using the AWS management console. You can refer to the [S3 Getting Started guide](https://docs.aws.amazon.com/AmazonS3/latest/gsg/PuttingAnObjectInABucket.html) for detailed instructions +* For the option **After last step completes** select **Clusters enters waiting state**. Since you will be examining the cluster during and after the Spark application, you can terminate the cluster at the end of the workshop manually instead of EMR auto-terminating immediately after the completion of step. -```python -import sys -from pyspark.sql import SparkSession -spark = SparkSession.builder.appName('Amazon reviews word count').getOrCreate() -df = spark.read.parquet("s3://amazon-reviews-pds/parquet/") -df.selectExpr("explode(split(lower(review_body), ' ')) as words").groupBy("words").count().write.mode("overwrite").parquet(sys.argv[1]) -exit() -``` -{{%attachments style="orange" /%}} + {{% notice note %}} + **Cluster auto-terminates** cluster after the last step is completed is a powerful EMR feature that is used for running transient clusters. This is an effective model for clusters that perform periodic processing tasks, such as a daily data processing run, event-driven ETL workloads, etc. + You will not be running a transient cluster, since it might terminate before you complete some of the next steps in the workshop. + {{% /notice %}} +* In the **Step type** drop down menu, select **Spark application** and click on **Add step**, then add the following details in the **Add step** dialog window: -Then add the location of the file under the **Application location** field, i.e: s3://\/script.py + * **Spark-submit options**: here you configure the memory and core count for each executor, as described in the previous section. Use these settings (make sure you have two '-' chars): + ``` + --executor-memory 18G --executor-cores 4 + ``` + * **Application location**: here you configure the location of Spark application. Save the following python code to a file (or download it from the Attachment box) and upload it to your S3 bucket using the AWS management console. You can refer to the [S3 Getting Started guide](https://docs.aws.amazon.com/AmazonS3/latest/gsg/PuttingAnObjectInABucket.html) for detailed instructions -* **Arguments**: Here we will configure the location of where Spark will write the results of the job. Enter: s3://\/results/ -* **Action on failure**: Leave this on *Continue* and click **Add** to save the step. + ```python + import sys + from pyspark.sql import SparkSession + spark = SparkSession.builder.appName('Amazon reviews word count').getOrCreate() + df = spark.read.parquet("s3://amazon-reviews-pds/parquet/") + df.selectExpr("explode(split(lower(review_body), ' ')) as words").groupBy("words").count().write.mode("overwrite").parquet(sys.argv[1]) + exit() + ``` + {{%attachments style="orange" /%}} -![sparksubmit](/images/running-emr-spark-apps-on-spot/sparksubmitstep1.png) -In the **After last step completes** selection, make sure that the "**Clusters enters waiting state**" option is checked. Since we are looking to examine the cluster during and after the Spark application run, we might end up with a terminated cluster before we complete the next steps in the workshop, if we opt to auto-terminate the cluster after our step is completed. + * Next add the location of the file under the **Application location** field, i.e: s3://\/script.py -{{% notice note %}} -**Auto-terminate cluster after the last step is completed** is a powerful EMR feature that is used for running transient clusters. This is an effective model for clusters that perform periodic processing tasks, such as a daily data processing run, event-driven ETL workloads, etc. -We will not be running a transient cluster, since it might terminate before we complete some of the next steps in the workshop. -{{% /notice %}} + * **Arguments**: Here you will configure the location of where Spark will write the results of the job. Enter: s3://\/results/ + * **Action on failure**: Leave this as **Continue** + + + + ![sparksubmit](/images/running-emr-spark-apps-on-spot/sparksubmitstep1.png) + + * Click **Add** to save the step. Click **Next** to continue setting up the EMR cluster and move from "**Step 1: Software and steps**"" to "**Step 2: Hardware**". + + + diff --git a/content/running_spark_apps_with_emr_on_spot_instances/launching_emr_cluster-2.md b/content/running_spark_apps_with_emr_on_spot_instances/launching_emr_cluster-2.md index 39e103bf..6a48346a 100644 --- a/content/running_spark_apps_with_emr_on_spot_instances/launching_emr_cluster-2.md +++ b/content/running_spark_apps_with_emr_on_spot_instances/launching_emr_cluster-2.md @@ -2,36 +2,76 @@ title: "Launch a cluster - Step 2" weight: 70 --- +### Hardware Configuration + +* Under **Cluster Composition** >> **Instance group configuration**, select Instance fleets. + +* Under **Network**, select the VPC that you deployed using the CloudFormation template earlier in the workshop (or the default VPC if you're running the workshop in an AWS event), and select all subnets in the VPC. -Under "**Instance group configuration**", select Instance Fleets. Under Network, select the VPC that you deployed using the CloudFormation template earlier in the workshop (or the default VPC if you're running the workshop in an AWS event), and select all subnets in the VPC. When you select multiple subnets, the EMR cluster will still be started in a single Availability Zone, but EMR Instance Fleets will make the best instance type selection based on available capacity and price across the multiple availability zones that you specified. Aslo, click on the checkbox "Apply allocation strategy" to leverage lowest-price allocation for On-Demand Instances and Capacity-Optimized allocation for Spot Instances; this will also allow you to configure up to 15 instance types on the Task Instance fleet. ![FleetSelection1](/images/running-emr-spark-apps-on-spot/emrinstancefleetsnetwork.png) +{{% notice note %}} +We recommend that you provide a list of subnets (Availability Zones) and instance types in instance fleets, Amazon EMR will automatically select one optimal subnet (AZ) based on cost and availability of instance types. +{{% /notice %}} + +* Click on the checkbox **Apply allocation strategy** to leverage lowest-priced allocation strategy for On-Demand Instances and capacity-optimized allocation strategy for Spot Instances. Allocation strategy will also allow you to configure up to 15 instance types on the Task Instance fleet in the EMR console and up to 30 instance types when you create the cluster with AWS CLI or EMR API. + +![allocationstrategy](/images/running-emr-spark-apps-on-spot/allocation-strategy.png) + +### Cluster Nodes and Instances + +#### Master fleet +The master node does not typically have large computational requirements. For clusters with a large number of nodes, or for clusters with applications that are specifically deployed on the master node (JupyterHub, Hue, etc.), a larger master node may be required and can help improve cluster performance. For example, consider using a General Purpose m5.xlarge instance for small clusters (50 or fewer nodes), and increasing to a larger instance type for larger clusters. -### Setting up our EMR Master node, and Core / Task Instance Fleets {{% notice note %}} -The workshop focuses on running Spot Instances across all the cluster node types for cost savings. If you want to dive deeper into when to use On-Demand and Spot in your EMR clusters, **[click here](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-instances-guidelines.html#emr-plan-spot-instances)** +You may experience insufficient capacity when using On-Demand Instances with allocation strategy for instance fleets. We recommend specifying a larger number of instance types for On-Demand Instances also, to diversify and reduce the chance of experiencing insufficient capacity. {{% /notice %}} -#### **Master node**: -Unless your cluster is very short-lived and the runs are cost-driven, avoid running your Master node on a Spot Instance. We suggest this because a Spot interruption on the Master node terminates the entire cluster. -For the purpose of this workshop, we will run the Master node on a Spot Instance as we simulate a relatively short lived job running on a transient cluster. There will not be business impact if the job fails due to a Spot interruption and later re-started. -Click **Add / remove instance types to fleet** and select two relatively cheaper instance types - i.e c5.xlarge and m5.xlarge and check Spot under target capacity. EMR will only provision one instance, but will select the best instance type for the Master node from the Spot instance pools with the optimal capacity. +Under **Node type** >> **Master**, click **Add / remove instance types to fleet** and select General Purpose instance types - i.e m4.xlarge, m5.xlarge, m5a.xlarge and m5d.xlarge. EMR will only provision one instance, but will select the cheapest On-Demand instance type for the Master node from the given instance types. ![FleetSelection1](/images/running-emr-spark-apps-on-spot/emrinstancefleets-master.png) +{{% notice warning %}} +Unless your cluster is very short-lived and the runs are cost-driven, avoid running your Master node on a Spot Instance. We suggest this because a Spot interruption on the Master node terminates the entire cluster. If you want to dive deeper into when to use On-Demand and Spot in your EMR clusters, **[click here](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-instances-guidelines.html#emr-plan-spot-instances)** +{{% /notice %}} + +#### Core fleet +When using EMR instance fleets, one core node is mandatory. Since we don't use HDFS in this workshop, you will auto-scale task fleet and keep only one mandatory core node using On-Demand Instances. Specify **4 On-demand units**, to allow single core node to run one executor and YARN application master. -#### **Core Instance Fleet**: -Avoid using Spot Instances for Core nodes if your Spark applications use HDFS. That prevents a situation where Spot interruptions cause data loss for data that was written to the HDFS volumes on the instances. For short-lived applications on transient clusters, as is the case in this workshop, we are going to run our Core nodes on Spot Instances. -When using EMR Instance Fleets, one Core node is mandatory. Since we want to scale out and run our Spark application on our Task nodes, let's stick to the one mandatory Core node. We will specify **4 Spot units**, and select instance types that count as 4 units and will allow to run one executor. -Under the core node type, click **Add / remove instance types to fleet** and select instance types that you noted before as suitable to run an executor (given the 18G executor size), for example: -![FleetSelection2](/images/running-emr-spark-apps-on-spot/emrinstancefleets-core1.png) +Under the **Node type** >> **Core** , click **Add / remove instance types to fleet** and select five instance types that you noted before as suitable to run an executor (given the 18G executor size), for example: +![FleetSelection2](/images/running-emr-spark-apps-on-spot/emrinstancefleets-core.png) -#### **Task Instance Fleet**: -Our task nodes will only run Spark executors and no HDFS DataNodes, so this is a great fit for scaling out and increasing the parallelization of our application's execution, to achieve faster execution times. -Under the task node type, click **Add / remove instance types to fleet** and select **up to 15 instance types** you noted before as suitable for your executor size. -Since our executor size is 4 vCPUs, and each instance counts as the number of its vCPUs towards the total units, let's specify **32 Spot units** in order to run 8 executors, and allow EMR to select the best instance type in the Task Instance Fleet to run the executors on. +{{% notice warning %}} +Core nodes process data and store information using HDFS, terminating a core instance risks data loss. YARN application master runs on one of the core nodes, in case of Spark applications the Spark driver runs on the YARN application master container hosted on the core node. Spark driver is a single point of failure in Spark applications. If driver dies, all other linked components will be discarded as well. +{{% /notice %}} -![FleetSelection3](/images/running-emr-spark-apps-on-spot/emrinstancefleets-task2.png) +#### Task fleet +Task nodes run only Spark executors and no HDFS DataNodes, therefore Task nodes are a great fit for scaling out and increasing parallel executions to achieve faster execution times. -click **Next** to continue to the next steps of launching your EMR cluster. +Under the **Node type** >> **Task** , click **Add / remove instance types to fleet** and select **up to 15 instance types** you noted before as suitable for your executor size. Since the executor size is 4 vCore, let's specify **32 Spot units** in order to run 8 executors to start with. +![FleetSelection3](/images/running-emr-spark-apps-on-spot/emrinstancefleets-task.png) + +### Enabling cluster scaling + +While you can always manually adjust the number of core or task nodes (EC2 instances) in your Amazon EMR cluster, you can also use the power of EMR auto-scaling to automatically adjust the cluster size in response to changing workloads without any manual intervention. + +Let's enable scaling for this cluster using **[Amazon EMR Managed Scaling](https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-managed-scaling-automatically-resize-clusters-to-lower-cost/)**. With EMR Managed scaling you specify the minimum and maximum compute limits for your cluster and Amazon EMR automatically resizes EMR clusters for best performance and resource utilization. EMR Managed Scaling constantly monitors key metrics based on workload and optimizes the cluster size for best resource utilization + +{{% notice note %}} +EMR Managed Scaling is supported for Apache Spark, Apache Hive and YARN-based workloads on Amazon EMR versions 5.30.1 and above. +{{% /notice %}} + +1. Select the checkbox for **Enable Cluster Scaling** in **Cluster scaling** section. +1. Set **MinimumCapacityUnits** to **4**, when cluster is not running any jobs you can allow it to scale down the task fleet to **0** and keep only the single core node that equals to **4 On-Demand units**. +1. Set **MaximumCapacityUnits** to **68**, you start the cluster with a core fleet with a single node (**4 On-Demand units**) and a task fleet with **32 Spot units**. You can allow EMR to further scale out the task fleet to twice the initial size (**64 Spot units**) and keep the core fleet as is. Therefore, MaximumCapacityUnits equals to 4 units of core nodes + 64 units of tasks nodes = **68 units**. +1. Set **MaximumOnDemandCapacityUnits** to **4**, allowing only core nodes to run on On-demand Instances. +1. Set **MaximumCoreCapacityUnits** to **4**, allowing only a single core nodes and scale out using task nodes. +![emrmanagedscaling](/images/running-emr-spark-apps-on-spot/emrmanagedscaling.png) + +{{% notice note %}} +Managed Scaling now also has the capability to prevent scaling down instances that store intermediate shuffle data for Apache Spark. Intelligently scaling down clusters without removing the instances that store intermediate shuffle data prevents job re-attempts and re-computations, which leads to better performance, and lower cost. +**[Click here](https://aws.amazon.com/about-aws/whats-new/2022/03/amazon-emr-managed-scaling-shuffle-data-aware/)** for more details. +{{% /notice %}} + +click **Next** to continue to the next steps of launching your EMR cluster. diff --git a/content/running_spark_apps_with_emr_on_spot_instances/selecting_instance_types.md b/content/running_spark_apps_with_emr_on_spot_instances/selecting_instance_types.md index e84e7dc9..0e58a89a 100644 --- a/content/running_spark_apps_with_emr_on_spot_instances/selecting_instance_types.md +++ b/content/running_spark_apps_with_emr_on_spot_instances/selecting_instance_types.md @@ -3,48 +3,48 @@ title: "Selecting instance types" weight: 50 --- -Let's use our newly acquired knowledge around Spark executor sizing in order to select the EC2 Instance Types that will be used in our EMR cluster. We determined that in order to be flexible and allow running on multiple instance types, we will submit our Spark application with **"–executor-memory=18GB –executor-cores=4"**. +Let's use our newly acquired knowledge around Spark executor sizing in order to select the EC2 instance types that will be used in our EMR cluster. We determined that in order to be flexible and allow running on multiple instance types, we will submit our Spark application with **"–executor-memory=18GB –executor-cores=4"**. -To apply the instance diversification best practices while meeting the application constraints defined in the previous section, we can add different instances sizes from the current generation, such as R5 and R4. We can even include variants, such as R5d instance types (local NVMe-based SSDs) and R5a instance types (powered by AMD processors). +To apply the instance diversification best practices while meeting the application constraints defined in the previous section, we can add different instance sizes from the current generation, such as R5 and R4. We can even include variants, such as R5d instance types (local NVMe-based SSDs) and R5a instance types (powered by AMD processors). {{% notice info %}} -There are over 275 different instance types available on EC2 which can make the process of selecting appropriate instance types difficult. **[amazon-ec2-instance-selector](https://github.com/aws/amazon-ec2-instance-selector)** helps you select compatible instance types for your application to run on. The command line interface can be passed resource criteria like vCPUs, memory, network performance, and much more and then return the available, matching instance types. +There are over 500 different instance types available on EC2 which can make the process of selecting appropriate instance types types difficult. **[amazon-ec2-instance-selector](https://github.com/aws/amazon-ec2-instance-selector)** helps you select compatible instance types for your application to run on. The command line interface can be passed resource criteria like vCPUs, memory, network performance, and much more and then return the available, matching instance types. {{% /notice %}} -We will use **amazon-ec2-instance-selector** to help us select the relevant instance +We will use amazon-ec2-instance-selector to help us select the relevant instance types with sufficient number of vCPUs and RAM. Let's first install amazon-ec2-instance-selector on Cloud9 IDE: ``` -curl -Lo ec2-instance-selector https://github.com/aws/amazon-ec2-instance-selector/releases/download/v1.3.0/ec2-instance-selector-`uname | tr '[:upper:]' '[:lower:]'`-amd64 && chmod +x ec2-instance-selector +curl -Lo ec2-instance-selector https://github.com/aws/amazon-ec2-instance-selector/releases/download/v2.3.2/ec2-instance-selector-`uname | tr '[:upper:]' '[:lower:]'`-amd64 && chmod +x ec2-instance-selector sudo mv ec2-instance-selector /usr/local/bin/ ec2-instance-selector --version ``` -Now that you have ec2-instance-selector installed, you can run -`ec2-instance-selector --help`, to understand how you could use it for selecting -instances that match your workload requirements. +Now that you have ec2-instance-selector installed, you can run `ec2-instance-selector --help`, to understand how you could use it for selecting instance types that match your workload requirements. + +For the purpose of this workshop we will select instance types based on below criteria: -For the purpose of this workshop we will select instances based on below criteria: * Instances that have minimum 4 vCPUs and maximum 16 vCPUs - * Instances which have vCPU to Memory ratio of 1:8, same as R Instance family - * Instances with CPU Architecture x86_64 and no GPU Instances + * Instances which have vCPU to Memory ratio of 1:8, same as R instance family + * Instances with CPU Architecture x86_64 and no GPU instance types. * Instances that belong to current generation - * Instances types that are not supported by EMR such as R5N, R5ad and R5b. Enhanced z, I and D Instance families, which are priced higher than R family. So basically, adding a deny list with the regular expression `.*n.*|.*ad.*|.*b.*|^[zid].*`. + * Exclude instance types that are not supported by EMR by adding --service emr-5.xx.0 flag. Set the correct *Release label* of EMR, it should match the EMR version you will choose later during cluster creation steps. + * Exclude enhanced instance types (z, I and D ) that are priced higher than R family. So basically, adding a deny list with a regular expression `^[zid].*`. {{% notice info %}} **[Click here](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-supported-instance-types.html)** to find out the instance types that Amazon EMR supports . {{% /notice %}} -Run the following command with above mentioned criteria, to get the list of instances. +Run the following command with above mentioned criteria, to get the list of instance types. You need to change the EMR release label to match your cluster version. ```bash -ec2-instance-selector --vcpus-min 4 --vcpus-max 16 --vcpus-to-memory-ratio 1:8 --cpu-architecture x86_64 --current-generation --gpus 0 --deny-list '.*n.*|.*ad.*|.*b.*|^[zid].*' +ec2-instance-selector --vcpus-min 4 --vcpus-max 16 --vcpus-to-memory-ratio 1:8 --cpu-architecture x86_64 --current-generation --gpus 0 --service emr-5.36.0 --deny-list '^[zid].*' ``` Internally ec2-instance-selector is making calls to the [DescribeInstanceTypes](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeInstanceTypes.html) for the specific region and filtering -the instances based on the criteria selected in the command line. Above command should display a list like the one that follows (note results might differ depending on the region). We will use this instances as part of our EMR Core and Task Instance Fleets. +the instance types based on the criteria selected in the command line. Above command should display a list like the one that follows (note results might differ depending on the region). We will use below instance types as part of our EMR Core and Task instance fleets. ``` r4.2xlarge @@ -56,12 +56,18 @@ r5.xlarge r5a.2xlarge r5a.4xlarge r5a.xlarge +r5b.2xlarge +r5b.4xlarge +r5b.xlarge r5d.2xlarge r5d.4xlarge -r5d.xlarge +r5d.xlarge +r5dn.2xlarge +r5dn.4xlarge +r5dn.xlarge ``` {{% notice note %}} -You are encouraged to test what are the options that `ec2-instance-selector` provides and run a few commands with it to familiarize yourself with the tool. -For example, try running the same commands as you did before with the extra parameter **`--output table-wide`**. -{{% /notice %}} \ No newline at end of file +You are encouraged to test other options that `ec2-instance-selector` provides and run a few commands with it to familiarize yourself with the tool. +For example, try running the same commands as you did before with the extra parameter `--output table-wide`. +{{% /notice %}} diff --git a/content/running_spark_apps_with_emr_on_spot_instances/spot_interruption_experiment.md b/content/running_spark_apps_with_emr_on_spot_instances/spot_interruption_experiment.md index dc263865..c269eb82 100644 --- a/content/running_spark_apps_with_emr_on_spot_instances/spot_interruption_experiment.md +++ b/content/running_spark_apps_with_emr_on_spot_instances/spot_interruption_experiment.md @@ -87,7 +87,7 @@ Outputs: Here are some important notes about the template: * You can configure how many instances you want to interrupt with the `InstancesToInterrupt` parameter. In the template it's defined that it's going to interrupt **three** instances. -* You can also configure how much time you want the expriment to run with the `DurationBeforeInterruption` parameter. By default, it's going to take two minutes. This means that as soon as you launch the experiment, the instance is going to receive the two-minute notification Spot interruption warning. +* You can also configure how much time you want the experiment to run with the `DurationBeforeInterruption` parameter. By default, it's going to take two minutes. This means that as soon as you launch the experiment, the instance is going to receive the two-minute notification Spot interruption warning. * The most important section is the `Targets` from the experiment template. The template has two placeholders `ResourceTagKey` and `ResourceTagValue` which are basically the key/value for the tags to use when choosing the instances to interrupt. We're going to run a `sed` command to replace them with the proper values for this workshop. * Notice that instances are **chosen randomly**, and only those who are in the `running` state. diff --git a/content/running_spark_apps_with_emr_on_spot_instances/spot_interruption_fis.md b/content/running_spark_apps_with_emr_on_spot_instances/spot_interruption_fis.md index 5192d10e..082d9548 100644 --- a/content/running_spark_apps_with_emr_on_spot_instances/spot_interruption_fis.md +++ b/content/running_spark_apps_with_emr_on_spot_instances/spot_interruption_fis.md @@ -3,11 +3,11 @@ title: "Interrupting a Spot Instance" weight: 98 --- -In this section, you're going to launch a Spot Interruption using FIS and then verify that the capacity has been replenished and EMR was able to continue running the Spark job. This will help you to confirm the low impact of your workloads when implemeting Spot effectively. Moreover, you can discover hidden weaknesses, and make your workloads fault-tolerant and resilient. +In this section, you're going to launch a Spot Interruption using FIS and then verify that the capacity has been replenished and EMR was able to continue running the Spark job. This will help you to confirm the low impact of your workloads when implementing Spot effectively. Moreover, you can discover hidden weaknesses, and make your workloads fault-tolerant and resilient. #### (Optional) Re-Launch the Spark Application -The Spark job could take around seven to eight minutes to finish. However, when you arrive to this part of the workshop, either the job is about to finish or has finished already. So, here are the commands you need to run to re-laun the Spark job in EMR. +The Spark job could take around seven to eight minutes to finish. However, when you arrive to this part of the workshop, either the job is about to finish or has finished already. So, here are the commands you need to run to re-launch the Spark job in EMR. First, you need to empty the results folder in the S3 bucket. Run the following command (replace the bucket name with yours): @@ -46,9 +46,11 @@ You should see a log message like this one: ![SpotInterruptionLog](/images/running-emr-spark-apps-on-spot/spotinterruptionlog.png) -#### Verify that EMR Instance Fleet replenished the capacity +#### Verify the actions taken by EMR instance fleet -Run the following command to get an understanding of how many instances are currently running before the Spot interruption: +You are running EMR instance fleets with managed cluster scaling, that constantly monitors key metrics and automatically increases or decreases the number of instances or units in your cluster based on workload. EMR instance fleet can launch replacement instances, if you managed to start the FIS experiment within first minute of Spark job and Spark job runs for additional 4 to 5 minutes. + +You can run the following command to see the list of instances with the date and time when they were launched. ``` aws ec2 describe-instances --filters\ @@ -57,20 +59,7 @@ aws ec2 describe-instances --filters\ | jq '.Reservations[].Instances[] | "Instance with ID:\(.InstanceId) launched at \(.LaunchTime)"' ``` -You should see a list of instances with the date and time when they were launched, like this: - -```output -"Instance with ID:i-06a82769173489f32 launched at 2022-04-06T14:02:49+00:00" -"Instance with ID:i-06c97b509c5e274e0 launched at 2022-04-06T14:02:49+00:00" -"Instance with ID:i-002b073c6479a5aba launched at 2022-04-06T14:02:49+00:00" -"Instance with ID:i-0e96071afef3fc145 launched at 2022-04-06T14:02:49+00:00" -"Instance with ID:i-0a3ddb3903526c712 launched at 2022-04-06T14:02:49+00:00" -"Instance with ID:i-05717d5d7954b0250 launched at 2022-04-06T14:02:49+00:00" -"Instance with ID:i-0bcd467f88ddd830e launched at 2022-04-06T14:02:49+00:00" -"Instance with ID:i-04c6ced30794e965b launched at 2022-04-06T14:02:49+00:00" -``` - -Wait around three minutes while the interrupted instances terminates, and the new instances finish bootstrapping. Run the previous command again to confirm that the new Spot instances are running, and the output will be like the following: +You should see a list of instances with the date and time when they were launched. If managed scaling launched the replacements then you would see new instances with launch time different from the others. ```output "Instance with ID:i-06a82769173489f32 launched at 2022-04-06T14:02:49+00:00" @@ -83,12 +72,26 @@ Wait around three minutes while the interrupted instances terminates, and the ne "Instance with ID:i-08818dc9ba688c3da launched at 2022-04-06T14:11:25+00:00" ``` -Notice how the launch time from the last instances are different from the others. - #### Verify that the Spark application completed successfully Follow the same steps from ["Examining the cluster"](/running_spark_apps_with_emr_on_spot_instances/examining_cluster.html) to launch the Spark History Server and explore the details of the recent Spark job submission. In the home screen, click on the latest App ID (if it's empty, wait for the job to finish) to see the execution details. You should see something like this: ![SparkJobCompleted](/images/running-emr-spark-apps-on-spot/sparkjobcompleted.png) -Notice how two minutes around after the job started, three executors were removed (each executor is a Spot instance). The job didn't stop, and when the new Spot instances were launched by EMR, Spark included them as new executors again to catch-up on completing the job. The job took around eight minutes to finish. If you don't see executors being added, you could re-launch the Spark job and start the FIS experiment right away. \ No newline at end of file +Notice how two minutes around after the job started, three executors were removed (each executor is a Spot instance). If your job runs long enough then you can see new executors being launched to catch-up on completing the job. In this example the job took around eight minutes to finish. If you don't see executors being added, you could re-launch the Spark job and start the FIS experiment as soon as the spark job starts. + +{{%expand "Question: As a result of Spot interruptions you might see different results, for example: all stages of your Spark jobs passed without any error, or a single stage was failed and then re-tried. Do you know why this happens and what actions are taken by EMR on the instances that were interrupted? Click to expand the answer." %}} + +#### Actions for decommissioned nodes +When a Spot Instance is interrupted, no new tasks get scheduled, and the active containers become idle (or the timeout expires), the node gets decommissioned. When the Spark driver receives the decommissioned signal, it can take the following additional actions to start the recovery process sooner rather than waiting for a fetch failure to occur: + +* All of the shuffle outputs on the decommissioned node are unregistered, thus marking them as unavailable. Amazon EMR enables this by default with the setting spark.resourceManager.cleanupExpiredHost set to true. This has the following advantages: + + * **If a single node is decommissioned during map stage**: lost shuffle is recomputed elsewhere before proceeding to the next Stage. Faster recovery as shuffle blocks computed during the map stage instead failures during shuffle stage. + + * **If a single node is decommissioned during shuffle stage**: target executors immediately sends the fetch failure to the driver instead of multiple retrying fetch the lost shuffle block. The driver then immediately fails the stage and starts recomputing the lost shuffle output. Reduces the time spent trying to fetch shuffle blocks from lost nodes. + + * **If multiple nodes are decommissioned during any stage**: Spark schedules the first re-attempt to compute the missing blocks, it notices all of the missing blocks from decommissioned nodes and recovers in a single attempt. This speeds up the recovery process significantly over the open-source Spark implementation. + +When a stage fails because of fetch failures from a node being decommissioned, by default, Amazon EMR does not count the stage failure toward the maximum number of failures allowed for a stage by the setting spark.stage.attempt.ignoreOnDecommissionFetchFailure set to true. This prevents a job from failing if a stage fails multiple times because of node failures due to Spot Instance termination. +{{% /expand%}} \ No newline at end of file diff --git a/static/images/running-emr-spark-apps-on-spot/allocation-strategy.png b/static/images/running-emr-spark-apps-on-spot/allocation-strategy.png new file mode 100644 index 00000000..a8bcd873 Binary files /dev/null and b/static/images/running-emr-spark-apps-on-spot/allocation-strategy.png differ diff --git a/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-core.png b/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-core.png new file mode 100644 index 00000000..2f0a6103 Binary files /dev/null and b/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-core.png differ diff --git a/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-core1.png b/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-core1.png deleted file mode 100644 index 113ae4a3..00000000 Binary files a/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-core1.png and /dev/null differ diff --git a/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-master.png b/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-master.png index b76554ec..3fe6cebf 100644 Binary files a/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-master.png and b/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-master.png differ diff --git a/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-task.png b/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-task.png new file mode 100644 index 00000000..9ab52ba7 Binary files /dev/null and b/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-task.png differ diff --git a/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-task2.png b/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-task2.png deleted file mode 100644 index 645e6af3..00000000 Binary files a/static/images/running-emr-spark-apps-on-spot/emrinstancefleets-task2.png and /dev/null differ diff --git a/static/images/running-emr-spark-apps-on-spot/emrmanagedscaling.png b/static/images/running-emr-spark-apps-on-spot/emrmanagedscaling.png new file mode 100644 index 00000000..c678a8d3 Binary files /dev/null and b/static/images/running-emr-spark-apps-on-spot/emrmanagedscaling.png differ diff --git a/static/images/running-emr-spark-apps-on-spot/emrsparkscalingevent.png b/static/images/running-emr-spark-apps-on-spot/emrsparkscalingevent.png new file mode 100644 index 00000000..feb8f94a Binary files /dev/null and b/static/images/running-emr-spark-apps-on-spot/emrsparkscalingevent.png differ