Skip to content

Commit

Permalink
Improved pipeline translation in SparkStructuredStreamingRunner (#22446)
Browse files Browse the repository at this point in the history
* Closes #22445: Improved pipeline translation in SparkStructuredStreamingRunner (also closes #22382)
  • Loading branch information
Moritz Mack authored Sep 22, 2022
1 parent 6b7d8b1 commit 762edd7
Show file tree
Hide file tree
Showing 53 changed files with 4,084 additions and 2,281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,24 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;

import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;

import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
import org.apache.spark.sql.types.DataType;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.WrappedArray;
import scala.reflect.ClassTag$;

public class EncoderFactory {

static <T> Encoder<T> create(
Expression serializer, Expression deserializer, Class<? super T> clazz) {
// TODO Isolate usage of Scala APIs in utility https://github.com/apache/beam/issues/22382
List<Expression> serializers = Nil$.MODULE$.$colon$colon(serializer);
return new ExpressionEncoder<>(
SchemaHelpers.binarySchema(),
false,
serializers,
listOf(serializer),
deserializer,
ClassTag$.MODULE$.apply(clazz));
}
Expand All @@ -46,6 +44,6 @@ static <T> Encoder<T> create(
* input arg is {@code null}.
*/
static Expression invokeIfNotNull(Class<?> cls, String fun, DataType type, Expression... args) {
return new StaticInvoke(cls, type, fun, new WrappedArray.ofRef<>(args), true, true);
return new StaticInvoke(cls, type, fun, seqOf(args), true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
package org.apache.beam.runners.spark.structuredstreaming.translation.utils;

import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.function.MapFunction;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.WrappedArray;

/** Helper functions for working with {@link org.apache.beam.sdk.values.KV}. */
public final class KVHelpers {
/** Utilities for easier interoperability with the Spark Scala API. */
public class ScalaInterop {
private ScalaInterop() {}

/** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */
public static <K, V> MapFunction<WindowedValue<KV<K, V>>, K> extractKey() {
return wv -> wv.getValue().getKey();
public static <T> Seq<T> seqOf(T... t) {
return new WrappedArray.ofRef<>(t);
}

public static <T> Seq<T> listOf(T t) {
return emptyList().$colon$colon(t);
}

public static <T> List<T> emptyList() {
return (List<T>) Nil$.MODULE$;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import org.apache.beam.runners.spark.structuredstreaming.metrics.CompositeSource;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.metrics.SparkBeamMetricSource;
import org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand All @@ -41,31 +40,42 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SparkStructuredStreamingRunner is based on spark structured streaming framework and is no more
* based on RDD/DStream API. See
* https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html It is still
* experimental, its coverage of the Beam model is partial. The SparkStructuredStreamingRunner
* translate operations defined on a pipeline to a representation executable by Spark, and then
* submitting the job to Spark to be executed. If we wanted to run a Beam pipeline with the default
* options of a single threaded spark instance in local mode, we would do the following:
* A Spark runner build on top of Spark's SQL Engine (<a
* href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html">Structured
* Streaming framework</a>).
*
* <p>{@code Pipeline p = [logic for pipeline creation] SparkStructuredStreamingPipelineResult
* result = (SparkStructuredStreamingPipelineResult) p.run(); }
* <p><b>This runner is experimental, its coverage of the Beam model is still partial. Due to
* limitations of the Structured Streaming framework (e.g. lack of support for multiple stateful
* operators), streaming mode is not yet supported by this runner. </b>
*
* <p>The runner translates transforms defined on a Beam pipeline to Spark `Dataset` transformations
* (leveraging the high level Dataset API) and then submits these to Spark to be executed.
*
* <p>To run a Beam pipeline with the default options using Spark's local mode, we would do the
* following:
*
* <pre>{@code
* Pipeline p = [logic for pipeline creation]
* PipelineResult result = p.run();
* }</pre>
*
* <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
* we would do the following:
*
* <p>{@code Pipeline p = [logic for pipeline creation] SparkStructuredStreamingPipelineOptions
* options = SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port");
* SparkStructuredStreamingPipelineResult result = (SparkStructuredStreamingPipelineResult) p.run();
* }
* <pre>{@code
* Pipeline p = [logic for pipeline creation]
* SparkCommonPipelineOptions options = p.getOptions.as(SparkCommonPipelineOptions.class);
* options.setSparkMaster("spark://host:port");
* PipelineResult result = p.run();
* }</pre>
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -135,7 +145,7 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {
AggregatorsAccumulator.clear();
MetricsAccumulator.clear();

final AbstractTranslationContext translationContext = translatePipeline(pipeline);
final TranslationContext translationContext = translatePipeline(pipeline);

final ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future<?> submissionFuture =
Expand Down Expand Up @@ -169,8 +179,10 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {
return result;
}

private AbstractTranslationContext translatePipeline(Pipeline pipeline) {
private TranslationContext translatePipeline(Pipeline pipeline) {
PipelineTranslator.detectTranslationMode(pipeline, options);
Preconditions.checkArgument(
!options.isStreaming(), "%s does not support streaming pipelines.", getClass().getName());

// Default to using the primitive versions of Read.Bounded and Read.Unbounded for non-portable
// execution.
Expand All @@ -182,10 +194,7 @@ private AbstractTranslationContext translatePipeline(Pipeline pipeline) {

PipelineTranslator.replaceTransforms(pipeline, options);
prepareFilesToStage(options);
PipelineTranslator pipelineTranslator =
options.isStreaming()
? new PipelineTranslatorStreaming(options)
: new PipelineTranslatorBatch(options);
PipelineTranslator pipelineTranslator = new PipelineTranslatorBatch(options);

final JavaSparkContext jsc =
JavaSparkContext.fromSparkContext(
Expand Down
Loading

0 comments on commit 762edd7

Please sign in to comment.