Skip to content

Commit

Permalink
[BEAM-13981] Remove Spark Runner specific code for event logging
Browse files Browse the repository at this point in the history
  • Loading branch information
iemejia committed Mar 11, 2022
1 parent 9e0aa6b commit 1aba87d
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.hasUnboundedPCollections;
import static org.apache.beam.runners.spark.SparkCommonPipelineOptions.prepareFilesToStage;
import static org.apache.beam.runners.spark.util.SparkCommon.startEventLoggingListener;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.UUID;
Expand Down Expand Up @@ -50,20 +49,16 @@
import org.apache.beam.runners.spark.translation.SparkStreamingTranslationContext;
import org.apache.beam.runners.spark.translation.SparkTranslationContext;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.metrics.MetricsOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.joda.time.Instant;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
Expand Down Expand Up @@ -117,10 +112,6 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
PortablePipelineResult result;
final JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);

final long startTime = Instant.now().getMillis();
EventLoggingListener eventLoggingListener =
startEventLoggingListener(jsc, pipelineOptions, startTime);

// Initialize accumulators.
AggregatorsAccumulator.init(pipelineOptions, jsc);
MetricsEnvironment.setMetricsSupported(true);
Expand Down Expand Up @@ -205,14 +196,6 @@ public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo)
result);
metricsPusher.start();

if (eventLoggingListener != null) {
eventLoggingListener.onApplicationStart(
SparkCompat.buildSparkListenerApplicationStart(jsc, pipelineOptions, startTime, result));
eventLoggingListener.onApplicationEnd(
new SparkListenerApplicationEnd(Instant.now().getMillis()));
eventLoggingListener.stop();
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.runners.spark;

import static org.apache.beam.runners.spark.SparkCommonPipelineOptions.prepareFilesToStage;
import static org.apache.beam.runners.spark.util.SparkCommon.startEventLoggingListener;

import java.util.Collection;
import java.util.HashMap;
Expand All @@ -43,7 +42,6 @@
import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener;
import org.apache.beam.runners.spark.util.SparkCompat;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand All @@ -68,12 +66,9 @@
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.apache.spark.scheduler.EventLoggingListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaStreamingListener;
import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -169,19 +164,13 @@ public SparkPipelineResult run(final Pipeline pipeline) {

prepareFilesToStage(pipelineOptions);

final long startTime = Instant.now().getMillis();
EventLoggingListener eventLoggingListener = null;
JavaSparkContext jsc = null;
if (pipelineOptions.isStreaming()) {
CheckpointDir checkpointDir = new CheckpointDir(pipelineOptions.getCheckpointDir());
SparkRunnerStreamingContextFactory streamingContextFactory =
new SparkRunnerStreamingContextFactory(pipeline, pipelineOptions, checkpointDir);
final JavaStreamingContext jssc =
JavaStreamingContext.getOrCreate(
checkpointDir.getSparkCheckpointDir().toString(), streamingContextFactory);
jsc = jssc.sparkContext();
eventLoggingListener = startEventLoggingListener(jsc, pipelineOptions, startTime);

// Checkpoint aggregator/metrics values
jssc.addStreamingListener(
new JavaStreamingListenerWrapper(
Expand Down Expand Up @@ -217,8 +206,7 @@ public SparkPipelineResult run(final Pipeline pipeline) {

result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
} else {
jsc = SparkContextFactory.getSparkContext(pipelineOptions);
eventLoggingListener = startEventLoggingListener(jsc, pipelineOptions, startTime);
JavaSparkContext jsc = SparkContextFactory.getSparkContext(pipelineOptions);
final EvaluationContext evaluationContext =
new EvaluationContext(jsc, pipeline, pipelineOptions);
translator = new TransformTranslator.Translator();
Expand Down Expand Up @@ -253,14 +241,6 @@ public SparkPipelineResult run(final Pipeline pipeline) {
result);
metricsPusher.start();

if (eventLoggingListener != null && jsc != null) {
eventLoggingListener.onApplicationStart(
SparkCompat.buildSparkListenerApplicationStart(jsc, pipelineOptions, startTime, result));
eventLoggingListener.onApplicationEnd(
new SparkListenerApplicationEnd(Instant.now().getMillis()));
eventLoggingListener.stop();
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,6 @@ public class SparkBeamMetric implements Metric {
return metrics;
}

public static Map<String, String> renderAllToString(MetricResults metricResults) {
Map<String, String> metricsString = new HashMap<>();
for (Map.Entry<String, ?> entry : renderAll(metricResults).entrySet()) {
String key = entry.getKey();
String value = String.valueOf(entry.getValue());
metricsString.put(key, value);
}
return metricsString;
}

Map<String, ?> renderAll() {
MetricResults metricResults =
asAttemptedOnlyMetricResults(MetricsAccumulator.getInstance().value());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,18 @@
*/
package org.apache.beam.runners.spark.util;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.metrics.SparkBeamMetric;
import org.apache.beam.runners.spark.translation.SparkCombineFn;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Option;
import scala.collection.JavaConverters;

/** A set of functions to provide API compatibility between Spark 2 and Spark 3. */
@SuppressWarnings({
Expand Down Expand Up @@ -123,56 +114,4 @@ public static <K, InputT, AccumT, OutputT> JavaPairRDD<K, WindowedValue<OutputT>
throw new RuntimeException("Error invoking Spark flatMapValues", e);
}
}

public static SparkListenerApplicationStart buildSparkListenerApplicationStart(
final JavaSparkContext jsc, SparkPipelineOptions options, long time, PipelineResult result) {
String appName = options.as(ApplicationNameOptions.class).getAppName();
Option<String> appId = Option.apply(jsc.getConf().getAppId());
Option<String> appAttemptId = Option.apply("1");
Option<scala.collection.Map<String, String>> driverLogs =
Option.apply(
JavaConverters.mapAsScalaMapConverter(
SparkBeamMetric.renderAllToString(result.metrics()))
.asScala());
try {
Class<?> clazz = Class.forName(SparkListenerApplicationStart.class.getName());
if (jsc.version().startsWith("3")) {
// This invokes by Reflection the equivalent of
// return new SparkListenerApplicationStart(
// appName, appId, time, jsc.sparkUser(), appAttemptId, driverLogs, driverAttributes);
Class<?>[] parameterTypes = {
String.class,
Option.class,
Long.TYPE,
String.class,
Option.class,
Option.class,
Option.class
};
Constructor<?> cons = clazz.getConstructor(parameterTypes);
Option<scala.collection.Map<String, String>> driverAttributes =
Option.apply(new scala.collection.immutable.HashMap<>());
Object[] args = {
appName, appId, time, jsc.sparkUser(), appAttemptId, driverLogs, driverAttributes
};
return (SparkListenerApplicationStart) cons.newInstance(args);
} else {
// This invokes by Reflection the equivalent of
// return new SparkListenerApplicationStart(
// appName, appId, time, jsc.sparkUser(), appAttemptId, driverLogs);
Class<?>[] parameterTypes = {
String.class, Option.class, Long.TYPE, String.class, Option.class, Option.class
};
Constructor<?> cons = clazz.getConstructor(parameterTypes);
Object[] args = {appName, appId, time, jsc.sparkUser(), appAttemptId, driverLogs};
return (SparkListenerApplicationStart) cons.newInstance(args);
}
} catch (ClassNotFoundException
| NoSuchMethodException
| IllegalAccessException
| InvocationTargetException
| InstantiationException e) {
throw new RuntimeException("Error building SparkListenerApplicationStart", e);
}
}
}

0 comments on commit 1aba87d

Please sign in to comment.