diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java index 13df6ccae5a7..e72c86522bfd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java @@ -19,7 +19,6 @@ import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections; import static org.apache.beam.runners.spark.SparkCommonPipelineOptions.prepareFilesToStage; -import static org.apache.beam.runners.spark.util.SparkCommon.startEventLoggingListener; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.UUID; @@ -50,7 +49,6 @@ import org.apache.beam.runners.spark.translation.SparkStreamingTranslationContext; import org.apache.beam.runners.spark.translation.SparkTranslationContext; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; -import org.apache.beam.runners.spark.util.SparkCompat; import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.metrics.MetricsOptions; @@ -58,12 +56,9 @@ import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Struct; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.scheduler.EventLoggingListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingListener; import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper; -import org.joda.time.Instant; import org.kohsuke.args4j.CmdLineException; import org.kohsuke.args4j.CmdLineParser; import org.kohsuke.args4j.Option; @@ -117,10 +112,6 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) PortablePipelineResult result; final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions); - final long startTime = Instant.now().getMillis(); - EventLoggingListener eventLoggingListener = - startEventLoggingListener(jsc, pipelineOptions, startTime); - // Initialize accumulators. AggregatorsAccumulator.init(pipelineOptions, jsc); MetricsEnvironment.setMetricsSupported(true); @@ -205,14 +196,6 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) result); metricsPusher.start(); - if (eventLoggingListener != null) { - eventLoggingListener.onApplicationStart( - SparkCompat.buildSparkListenerApplicationStart(jsc, pipelineOptions, startTime, result)); - eventLoggingListener.onApplicationEnd( - new SparkListenerApplicationEnd(Instant.now().getMillis())); - eventLoggingListener.stop(); - } - return result; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 747a926e75b4..ed434a2e348b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.spark; import static org.apache.beam.runners.spark.SparkCommonPipelineOptions.prepareFilesToStage; -import static org.apache.beam.runners.spark.util.SparkCommon.startEventLoggingListener; import java.util.Collection; import java.util.HashMap; @@ -43,7 +42,6 @@ import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener; -import org.apache.beam.runners.spark.util.SparkCompat; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.metrics.MetricsEnvironment; @@ -68,12 +66,9 @@ import org.apache.spark.SparkEnv$; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.metrics.MetricsSystem; -import org.apache.spark.scheduler.EventLoggingListener; -import org.apache.spark.scheduler.SparkListenerApplicationEnd; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingListener; import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper; -import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,9 +164,6 @@ public SparkPipelineResult run(final Pipeline pipeline) { prepareFilesToStage(pipelineOptions); - final long startTime = Instant.now().getMillis(); - EventLoggingListener eventLoggingListener = null; - JavaSparkContext jsc = null; if (pipelineOptions.isStreaming()) { CheckpointDir checkpointDir = new CheckpointDir(pipelineOptions.getCheckpointDir()); SparkRunnerStreamingContextFactory streamingContextFactory = @@ -179,9 +171,6 @@ public SparkPipelineResult run(final Pipeline pipeline) { final JavaStreamingContext jssc = JavaStreamingContext.getOrCreate( checkpointDir.getSparkCheckpointDir().toString(), streamingContextFactory); - jsc = jssc.sparkContext(); - eventLoggingListener = startEventLoggingListener(jsc, pipelineOptions, startTime); - // Checkpoint aggregator/metrics values jssc.addStreamingListener( new JavaStreamingListenerWrapper( @@ -217,8 +206,7 @@ public SparkPipelineResult run(final Pipeline pipeline) { result = new SparkPipelineResult.StreamingMode(startPipeline, jssc); } else { - jsc = SparkContextFactory.getSparkContext(pipelineOptions); - eventLoggingListener = startEventLoggingListener(jsc, pipelineOptions, startTime); + JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions); final EvaluationContext evaluationContext = new EvaluationContext(jsc, pipeline, pipelineOptions); translator = new TransformTranslator.Translator(); @@ -253,14 +241,6 @@ public SparkPipelineResult run(final Pipeline pipeline) { result); metricsPusher.start(); - if (eventLoggingListener != null && jsc != null) { - eventLoggingListener.onApplicationStart( - SparkCompat.buildSparkListenerApplicationStart(jsc, pipelineOptions, startTime, result)); - eventLoggingListener.onApplicationEnd( - new SparkListenerApplicationEnd(Instant.now().getMillis())); - eventLoggingListener.stop(); - } - return result; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java index 02745c927c7e..298db0fc68a9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java @@ -61,16 +61,6 @@ public class SparkBeamMetric implements Metric { return metrics; } - public static Map renderAllToString(MetricResults metricResults) { - Map metricsString = new HashMap<>(); - for (Map.Entry entry : renderAll(metricResults).entrySet()) { - String key = entry.getKey(); - String value = String.valueOf(entry.getValue()); - metricsString.put(key, value); - } - return metricsString; - } - Map renderAll() { MetricResults metricResults = asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value()); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkCommon.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkCommon.java deleted file mode 100644 index 6f1f1f72cf48..000000000000 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkCommon.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.util; - -import java.net.URI; -import java.net.URISyntaxException; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.sdk.annotations.Internal; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.scheduler.EventLoggingListener; -import org.apache.spark.scheduler.SparkListenerExecutorAdded; -import org.apache.spark.scheduler.cluster.ExecutorInfo; -import org.checkerframework.checker.nullness.qual.Nullable; -import scala.Tuple2; - -/** Common methods to build Spark specific objects used by different runners. */ -@Internal -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -public class SparkCommon { - - /** - * Starts an EventLoggingListener to save Beam Metrics on Spark's History Server if event logging - * is enabled. - * - * @return The associated EventLoggingListener or null if it could not be started. - */ - public static @Nullable EventLoggingListener startEventLoggingListener( - final JavaSparkContext jsc, SparkPipelineOptions pipelineOptions, long startTime) { - EventLoggingListener eventLoggingListener = null; - try { - if (jsc.getConf().getBoolean("spark.eventLog.enabled", false)) { - eventLoggingListener = - new EventLoggingListener( - jsc.getConf().getAppId(), - scala.Option.apply("1"), - new URI(jsc.getConf().get("spark.eventLog.dir", null)), - jsc.getConf(), - jsc.hadoopConfiguration()); - eventLoggingListener.initializeLogIfNecessary(false, false); - eventLoggingListener.start(); - - scala.collection.immutable.Map logUrlMap = - new scala.collection.immutable.HashMap<>(); - Tuple2[] sparkMasters = jsc.getConf().getAllWithPrefix("spark.master"); - Tuple2[] sparkExecutors = - jsc.getConf().getAllWithPrefix("spark.executor.id"); - for (Tuple2 sparkExecutor : sparkExecutors) { - eventLoggingListener.onExecutorAdded( - new SparkListenerExecutorAdded( - startTime, - sparkExecutor._2(), - new ExecutorInfo(sparkMasters[0]._2(), 0, logUrlMap))); - } - return eventLoggingListener; - } - } catch (URISyntaxException e) { - throw new RuntimeException( - "The URI syntax in the Spark config \"spark.eventLog.dir\" is not correct", e); - } - return eventLoggingListener; - } -} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkCompat.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkCompat.java index 27b759baead0..aa352b4b85e0 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkCompat.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkCompat.java @@ -17,27 +17,18 @@ */ package org.apache.beam.runners.spark.util; -import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.List; import java.util.stream.Collectors; -import org.apache.beam.runners.spark.SparkPipelineOptions; -import org.apache.beam.runners.spark.metrics.SparkBeamMetric; import org.apache.beam.runners.spark.translation.SparkCombineFn; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; -import org.apache.spark.scheduler.SparkListenerApplicationStart; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import scala.Option; -import scala.collection.JavaConverters; /** A set of functions to provide API compatibility between Spark 2 and Spark 3. */ @SuppressWarnings({ @@ -123,56 +114,4 @@ public static JavaPairRDD throw new RuntimeException("Error invoking Spark flatMapValues", e); } } - - public static SparkListenerApplicationStart buildSparkListenerApplicationStart( - final JavaSparkContext jsc, SparkPipelineOptions options, long time, PipelineResult result) { - String appName = options.as(ApplicationNameOptions.class).getAppName(); - Option appId = Option.apply(jsc.getConf().getAppId()); - Option appAttemptId = Option.apply("1"); - Option> driverLogs = - Option.apply( - JavaConverters.mapAsScalaMapConverter( - SparkBeamMetric.renderAllToString(result.metrics())) - .asScala()); - try { - Class clazz = Class.forName(SparkListenerApplicationStart.class.getName()); - if (jsc.version().startsWith("3")) { - // This invokes by Reflection the equivalent of - // return new SparkListenerApplicationStart( - // appName, appId, time, jsc.sparkUser(), appAttemptId, driverLogs, driverAttributes); - Class[] parameterTypes = { - String.class, - Option.class, - Long.TYPE, - String.class, - Option.class, - Option.class, - Option.class - }; - Constructor cons = clazz.getConstructor(parameterTypes); - Option> driverAttributes = - Option.apply(new scala.collection.immutable.HashMap<>()); - Object[] args = { - appName, appId, time, jsc.sparkUser(), appAttemptId, driverLogs, driverAttributes - }; - return (SparkListenerApplicationStart) cons.newInstance(args); - } else { - // This invokes by Reflection the equivalent of - // return new SparkListenerApplicationStart( - // appName, appId, time, jsc.sparkUser(), appAttemptId, driverLogs); - Class[] parameterTypes = { - String.class, Option.class, Long.TYPE, String.class, Option.class, Option.class - }; - Constructor cons = clazz.getConstructor(parameterTypes); - Object[] args = {appName, appId, time, jsc.sparkUser(), appAttemptId, driverLogs}; - return (SparkListenerApplicationStart) cons.newInstance(args); - } - } catch (ClassNotFoundException - | NoSuchMethodException - | IllegalAccessException - | InvocationTargetException - | InstantiationException e) { - throw new RuntimeException("Error building SparkListenerApplicationStart", e); - } - } }