From 4caf4d28819d82240f1d61e11cdabd68fafad14a Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Wed, 22 Mar 2017 19:59:38 -0700 Subject: [PATCH 1/6] [SPARK-18364][YARN] Expose metrics for YarnShuffleService Registers the shuffle server's metrics with the Hadoop Node Manager's DefaultMetricsSystem. --- .../network/yarn/YarnShuffleService.java | 25 +++- .../yarn/YarnShuffleServiceMetrics.java | 116 ++++++++++++++++++ 2 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index c7620d0fe1288..91d14491a87a6 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -19,8 +19,9 @@ import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.List; import java.util.Map; @@ -35,9 +36,12 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; -import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -50,6 +54,7 @@ import org.apache.spark.network.server.TransportServer; import org.apache.spark.network.server.TransportServerBootstrap; import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler; +import org.apache.spark.network.util.LevelDBProvider; import org.apache.spark.network.util.TransportConf; import org.apache.spark.network.yarn.util.HadoopConfigProvider; @@ -166,6 +171,22 @@ protected void serviceInit(Configuration conf) throws Exception { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + // register metrics on the block handler into the Node Manager's metrics system. + try { + YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + + Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", + String.class, String.class, MetricsSource.class); + registerSourceMethod.setAccessible(true); + registerSourceMethod.invoke(metricsSystem, "shuffleservice", "Metrics on the Spark " + + "Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + } catch (Exception e) { + logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " + + "proceeding without metrics", e); + } + // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java new file mode 100644 index 0000000000000..4c104b47357bf --- /dev/null +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.codahale.metrics.Timer; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; + +import java.util.Map; + +/** + * Modeled off of YARN's NodeManagerMetrics. + */ +public class YarnShuffleServiceMetrics implements MetricsSource { + + private final MetricSet metricSet; + + public YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } + + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + String name = entry.getKey(); + + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (entry.getValue() instanceof Timer) { + Timer t = (Timer) entry.getValue(); + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (entry.getValue() instanceof Meter) { + Meter m = (Meter) entry.getValue(); + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (entry.getValue() instanceof Gauge) { + Gauge m = (Gauge) entry.getValue(); + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); + } + } + } + } + + private static class ShuffleServiceMetricsInfo implements MetricsInfo { + + private final String name; + private final String description; + + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + } +} From 13aa4ff86038d706e7f3c3dfff449c6e762205f3 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Thu, 23 Mar 2017 22:27:03 -0700 Subject: [PATCH 2/6] 2-space indentation on java files; pass ./dev/lint-java --- .../network/yarn/YarnShuffleService.java | 4 +- .../yarn/YarnShuffleServiceMetrics.java | 146 +++++++++--------- 2 files changed, 76 insertions(+), 74 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 91d14491a87a6..d33d45ba826be 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -37,7 +37,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.metrics2.MetricsSource; -import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -173,7 +172,8 @@ protected void serviceInit(Configuration conf) throws Exception { // register metrics on the block handler into the Node Manager's metrics system. try { - YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics( + blockHandler.getAllMetrics()); MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 4c104b47357bf..229b05842d48e 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -17,11 +17,7 @@ package org.apache.spark.network.yarn; -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Metric; -import com.codahale.metrics.MetricSet; -import com.codahale.metrics.Timer; +import com.codahale.metrics.*; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -34,83 +30,89 @@ */ public class YarnShuffleServiceMetrics implements MetricsSource { - private final MetricSet metricSet; + private final MetricSet metricSet; - public YarnShuffleServiceMetrics(MetricSet metricSet) { - this.metricSet = metricSet; - } + public YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } - /** - * Get metrics from the source - * - * @param collector to contain the resulting metrics snapshot - * @param all if true, return all metrics even if unchanged. - */ - @Override - public void getMetrics(MetricsCollector collector, boolean all) { - MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); - for (Map.Entry entry : metricSet.getMetrics().entrySet()) { - String name = entry.getKey(); + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + String name = entry.getKey(); - // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics - if (entry.getValue() instanceof Timer) { - Timer t = (Timer) entry.getValue(); - metricsRecordBuilder - .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), - t.getCount()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), - t.getFifteenMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), - t.getFiveMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), - t.getOneMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), - t.getMeanRate()); - } else if (entry.getValue() instanceof Meter) { - Meter m = (Meter) entry.getValue(); - metricsRecordBuilder - .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), - m.getCount()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), - m.getFifteenMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), - m.getFiveMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), - m.getOneMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), - m.getMeanRate()); - } else if (entry.getValue() instanceof Gauge) { - Gauge m = (Gauge) entry.getValue(); - Object gaugeValue = m.getValue(); - if (gaugeValue instanceof Integer) { - Integer intValue = (Integer) gaugeValue; - metricsRecordBuilder - .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + - "gauge " + name), intValue.intValue()); - } - } + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (entry.getValue() instanceof Timer) { + Timer t = (Timer) entry.getValue(); + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (entry.getValue() instanceof Meter) { + Meter m = (Meter) entry.getValue(); + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (entry.getValue() instanceof Gauge) { + Gauge m = (Gauge) entry.getValue(); + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; + metricsRecordBuilder + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); } + } } + } - private static class ShuffleServiceMetricsInfo implements MetricsInfo { + private static class ShuffleServiceMetricsInfo implements MetricsInfo { - private final String name; - private final String description; + private final String name; + private final String description; - ShuffleServiceMetricsInfo(String name, String description) { - this.name = name; - this.description = description; - } + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } - @Override - public String name() { - return name; - } + @Override + public String name() { + return name; + } - @Override - public String description() { - return description; - } + @Override + public String description() { + return description; } + } } From 83b1d732b879b1f735fb75b244c6b16132ff82f3 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Fri, 24 Mar 2017 00:13:55 -0700 Subject: [PATCH 3/6] Test metric collector gets right converted calls --- .../shuffle/ExternalShuffleBlockHandler.java | 5 +- .../yarn/YarnShuffleServiceMetrics.java | 89 ++++++++++--------- .../yarn/YarnShuffleServiceMetricsSuite.scala | 74 +++++++++++++++ 3 files changed, 124 insertions(+), 44 deletions(-) create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java index 6daf9609d76dc..1b340e801adf3 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java @@ -176,7 +176,8 @@ private void checkAuth(TransportClient client, String appId) { /** * A simple class to wrap all shuffle service wrapper metrics */ - private class ShuffleMetrics implements MetricSet { + @VisibleForTesting + public class ShuffleMetrics implements MetricSet { private final Map allMetrics; // Time latency for open block request in ms private final Timer openBlockRequestLatencyMillis = new Timer(); @@ -185,7 +186,7 @@ private class ShuffleMetrics implements MetricSet { // Block transfer rate in byte per second private final Meter blockTransferRateBytes = new Meter(); - private ShuffleMetrics() { + public ShuffleMetrics() { allMetrics = new HashMap<>(); allMetrics.put("openBlockRequestLatencyMillis", openBlockRequestLatencyMillis); allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 229b05842d48e..86cb07ae711ac 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -18,6 +18,7 @@ package org.apache.spark.network.yarn; import com.codahale.metrics.*; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; @@ -47,50 +48,54 @@ public void getMetrics(MetricsCollector collector, boolean all) { MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService"); for (Map.Entry entry : metricSet.getMetrics().entrySet()) { - String name = entry.getKey(); + collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); + } + } - // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics - if (entry.getValue() instanceof Timer) { - Timer t = (Timer) entry.getValue(); - metricsRecordBuilder - .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), - t.getCount()) - .addGauge( - new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), - t.getFifteenMinuteRate()) - .addGauge( - new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), - t.getFiveMinuteRate()) - .addGauge( - new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), - t.getOneMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), - t.getMeanRate()); - } else if (entry.getValue() instanceof Meter) { - Meter m = (Meter) entry.getValue(); + @VisibleForTesting + public static void collectMetric(MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { + + // The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics + if (metric instanceof Timer) { + Timer t = (Timer) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (metric instanceof Meter) { + Meter m = (Meter) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (metric instanceof Gauge) { + Gauge m = (Gauge) metric; + Object gaugeValue = m.getValue(); + if (gaugeValue instanceof Integer) { + Integer intValue = (Integer) gaugeValue; metricsRecordBuilder - .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), - m.getCount()) - .addGauge( - new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), - m.getFifteenMinuteRate()) - .addGauge( - new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), - m.getFiveMinuteRate()) - .addGauge( - new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), - m.getOneMinuteRate()) - .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), - m.getMeanRate()); - } else if (entry.getValue() instanceof Gauge) { - Gauge m = (Gauge) entry.getValue(); - Object gaugeValue = m.getValue(); - if (gaugeValue instanceof Integer) { - Integer intValue = (Integer) gaugeValue; - metricsRecordBuilder - .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + - "gauge " + name), intValue.intValue()); - } + .addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " + + "gauge " + name), intValue.intValue()); } } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala new file mode 100644 index 0000000000000..01d634b6ccdab --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.yarn + +import org.apache.hadoop.metrics2.MetricsRecordBuilder +import org.apache.spark.SparkFunSuite +import org.apache.spark.network.server.OneForOneStreamManager +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} +import org.mockito.Matchers._ +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.Matchers + +import scala.collection.JavaConverters._ + +class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { + + val streamManager = mock(classOf[OneForOneStreamManager]) + val blockResolver = mock(classOf[ExternalShuffleBlockResolver]) + when(blockResolver.getRegisteredExecutorsSize).thenReturn(42) + + val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics + + test("metrics named as expected") { + val allMetrics = Set( + "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes", "registeredExecutorsSize") + + metrics.getMetrics.keySet().asScala should be (allMetrics) + } + + // these three metrics have the same effect on the collector + for (testname <- Seq("openBlockRequestLatencyMillis", + "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes")) { + test(s"$testname - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, testname, + metrics.getMetrics.get(testname)) + + verify(builder).addCounter(anyObject(), anyLong()) + verify(builder, times(4)).addGauge(anyObject(), anyDouble()) + } + } + + // this metric writes only one gauge to the collector + test("registeredExecutorsSize - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize", + metrics.getMetrics.get("registeredExecutorsSize")) + + // only one + verify(builder).addGauge(anyObject(), anyInt()) + } +} From 9992c10ccb2f38a7089ac21c31d3adaf23c9a2ba Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Fri, 24 Mar 2017 00:14:22 -0700 Subject: [PATCH 4/6] camel-case shuffleService --- .../java/org/apache/spark/network/yarn/YarnShuffleService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index d33d45ba826be..63b87ae35b242 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -179,7 +179,7 @@ protected void serviceInit(Configuration conf) throws Exception { Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", String.class, String.class, MetricsSource.class); registerSourceMethod.setAccessible(true); - registerSourceMethod.invoke(metricsSystem, "shuffleservice", "Metrics on the Spark " + + registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " + "Shuffle Service", serviceMetrics); logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); } catch (Exception e) { From 96a0882843e9072bb2ef02ee798a6e5d2f0a41b0 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Fri, 24 Mar 2017 09:45:07 -0700 Subject: [PATCH 5/6] Pass scalastyle --- .../network/yarn/YarnShuffleServiceMetricsSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index 01d634b6ccdab..1330ceebf5087 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -17,15 +17,15 @@ package org.apache.spark.network.yarn import org.apache.hadoop.metrics2.MetricsRecordBuilder -import org.apache.spark.SparkFunSuite -import org.apache.spark.network.server.OneForOneStreamManager -import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} import org.mockito.Matchers._ import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest.Matchers - import scala.collection.JavaConverters._ +import org.apache.spark.network.server.OneForOneStreamManager +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} +import org.apache.spark.SparkFunSuite + class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { val streamManager = mock(classOf[OneForOneStreamManager]) From 7c7d6d4c4c6f572e6e1646ff0b8e6e99e95b43c3 Mon Sep 17 00:00:00 2001 From: Andrew Ash Date: Tue, 28 Mar 2017 10:03:36 -0700 Subject: [PATCH 6/6] Reformat and organize imports With import order specified at http://spark.apache.org/contributing.html --- .../network/yarn/YarnShuffleService.java | 29 +++++++++---------- .../yarn/YarnShuffleServiceMetricsSuite.scala | 5 ++-- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 63b87ae35b242..b5ba117db2552 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.api.*; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,13 +58,13 @@ /** * An external shuffle service used by Spark on Yarn. - * + *

* This is intended to be a long-running auxiliary service that runs in the NodeManager process. * A Spark application may connect to this service by setting `spark.shuffle.service.enabled`. * The application also automatically derives the service port through `spark.shuffle.service.port` * specified in the Yarn configuration. This is so that both the clients and the server agree on * the same port to communicate on. - * + *

* The service also optionally supports authentication. This ensures that executors from one * application cannot read the shuffle files written by those from another. This feature can be * enabled by setting `spark.authenticate` in the Yarn configuration before starting the NM. @@ -100,7 +99,7 @@ public class YarnShuffleService extends AuxiliaryService { private static final ObjectMapper mapper = new ObjectMapper(); private static final String APP_CREDS_KEY_PREFIX = "AppCreds"; private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider - .StoreVersion(1, 0); + .StoreVersion(1, 0); // just for integration tests that want to look at this file -- in general not sensible as // a static @@ -177,14 +176,14 @@ protected void serviceInit(Configuration conf) throws Exception { MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource", - String.class, String.class, MetricsSource.class); + String.class, String.class, MetricsSource.class); registerSourceMethod.setAccessible(true); registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " + - "Shuffle Service", serviceMetrics); + "Shuffle Service", serviceMetrics); logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); } catch (Exception e) { logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " + - "proceeding without metrics", e); + "proceeding without metrics", e); } // If authentication is enabled, set up the shuffle server to use a @@ -205,7 +204,7 @@ protected void serviceInit(Configuration conf) throws Exception { boundPort = port; String authEnabledString = authEnabled ? "enabled" : "not enabled"; logger.info("Started YARN shuffle service for Spark on port {}. " + - "Authentication is {}. Registered executor file is {}", port, authEnabledString, + "Authentication is {}. Registered executor file is {}", port, authEnabledString, registeredExecutorFile); } catch (Exception e) { if (stopOnFailure) { @@ -222,7 +221,7 @@ private void createSecretManager() throws IOException { // Make sure this is protected in case its not in the NM recovery dir FileSystem fs = FileSystem.getLocal(_conf); - fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700)); + fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700)); db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); logger.info("Recovery location is: " + secretsFile.getPath()); @@ -363,10 +362,10 @@ protected Path getRecoveryPath(String fileName) { */ protected File initRecoveryDb(String dbFileName) { if (_recoveryPath != null) { - File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName); - if (recoveryFile.exists()) { - return recoveryFile; - } + File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName); + if (recoveryFile.exists()) { + return recoveryFile; + } } // db doesn't exist in recovery path go check local dirs for it String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); @@ -433,8 +432,8 @@ public int hashCode() { @Override public String toString() { return Objects.toStringHelper(this) - .add("appId", appId) - .toString(); + .add("appId", appId) + .toString(); } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index 1330ceebf5087..183545c94f329 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -16,15 +16,16 @@ */ package org.apache.spark.network.yarn +import scala.collection.JavaConverters._ + import org.apache.hadoop.metrics2.MetricsRecordBuilder import org.mockito.Matchers._ import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest.Matchers -import scala.collection.JavaConverters._ +import org.apache.spark.SparkFunSuite import org.apache.spark.network.server.OneForOneStreamManager import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} -import org.apache.spark.SparkFunSuite class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {