Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved pipeline translation in SparkStructuredStreamingRunner #22446

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,24 @@
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;

import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.listOf;
import static org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop.seqOf;

import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.expressions.Expression;
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke;
import org.apache.spark.sql.types.DataType;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.WrappedArray;
import scala.reflect.ClassTag$;

public class EncoderFactory {

static <T> Encoder<T> create(
Expression serializer, Expression deserializer, Class<? super T> clazz) {
// TODO Isolate usage of Scala APIs in utility https://github.com/apache/beam/issues/22382
List<Expression> serializers = Nil$.MODULE$.$colon$colon(serializer);
return new ExpressionEncoder<>(
SchemaHelpers.binarySchema(),
false,
serializers,
listOf(serializer),
deserializer,
ClassTag$.MODULE$.apply(clazz));
}
Expand All @@ -46,6 +44,6 @@ static <T> Encoder<T> create(
* input arg is {@code null}.
*/
static Expression invokeIfNotNull(Class<?> cls, String fun, DataType type, Expression... args) {
return new StaticInvoke(cls, type, fun, new WrappedArray.ofRef<>(args), true, true);
return new StaticInvoke(cls, type, fun, seqOf(args), true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.spark.structuredstreaming.translation.helpers;
package org.apache.beam.runners.spark.structuredstreaming.translation.utils;

import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.function.MapFunction;
import scala.collection.Seq;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.WrappedArray;

/** Helper functions for working with {@link org.apache.beam.sdk.values.KV}. */
public final class KVHelpers {
/** Utilities for easier interoperability with the Spark Scala API. */
public class ScalaInterop {
private ScalaInterop() {}

/** A Spark {@link MapFunction} for extracting the key out of a {@link KV} for GBK for example. */
public static <K, V> MapFunction<WindowedValue<KV<K, V>>, K> extractKey() {
return wv -> wv.getValue().getKey();
public static <T> Seq<T> seqOf(T... t) {
return new WrappedArray.ofRef<>(t);
}

public static <T> Seq<T> listOf(T t) {
return emptyList().$colon$colon(t);
}

public static <T> List<T> emptyList() {
return (List<T>) Nil$.MODULE$;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import org.apache.beam.runners.spark.structuredstreaming.metrics.CompositeSource;
import org.apache.beam.runners.spark.structuredstreaming.metrics.MetricsAccumulator;
import org.apache.beam.runners.spark.structuredstreaming.metrics.SparkBeamMetricSource;
import org.apache.beam.runners.spark.structuredstreaming.translation.AbstractTranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.PipelineTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
import org.apache.beam.runners.spark.structuredstreaming.translation.batch.PipelineTranslatorBatch;
import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.PipelineTranslatorStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
Expand All @@ -41,31 +40,42 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.SparkEnv$;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.metrics.MetricsSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* SparkStructuredStreamingRunner is based on spark structured streaming framework and is no more
* based on RDD/DStream API. See
* https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html It is still
* experimental, its coverage of the Beam model is partial. The SparkStructuredStreamingRunner
* translate operations defined on a pipeline to a representation executable by Spark, and then
* submitting the job to Spark to be executed. If we wanted to run a Beam pipeline with the default
* options of a single threaded spark instance in local mode, we would do the following:
* A Spark runner build on top of Spark's SQL Engine (<a
* href="https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html">Structured
* Streaming framework</a>).
*
* <p>{@code Pipeline p = [logic for pipeline creation] SparkStructuredStreamingPipelineResult
* result = (SparkStructuredStreamingPipelineResult) p.run(); }
* <p><b>This runner is experimental, its coverage of the Beam model is still partial. Due to
* limitations of the Structured Streaming framework (e.g. lack of support for multiple stateful
* operators), streaming mode is not yet supported by this runner. </b>
*
* <p>The runner translates transforms defined on a Beam pipeline to Spark `Dataset` transformations
* (leveraging the high level Dataset API) and then submits these to Spark to be executed.
*
* <p>To run a Beam pipeline with the default options using Spark's local mode, we would do the
* following:
*
* <pre>{@code
* Pipeline p = [logic for pipeline creation]
* PipelineResult result = p.run();
* }</pre>
*
* <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
* we would do the following:
*
* <p>{@code Pipeline p = [logic for pipeline creation] SparkStructuredStreamingPipelineOptions
* options = SparkPipelineOptionsFactory.create(); options.setSparkMaster("spark://host:port");
* SparkStructuredStreamingPipelineResult result = (SparkStructuredStreamingPipelineResult) p.run();
* }
* <pre>{@code
* Pipeline p = [logic for pipeline creation]
* SparkCommonPipelineOptions options = p.getOptions.as(SparkCommonPipelineOptions.class);
* options.setSparkMaster("spark://host:port");
* PipelineResult result = p.run();
* }</pre>
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand Down Expand Up @@ -135,7 +145,7 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {
AggregatorsAccumulator.clear();
MetricsAccumulator.clear();

final AbstractTranslationContext translationContext = translatePipeline(pipeline);
final TranslationContext translationContext = translatePipeline(pipeline);

final ExecutorService executorService = Executors.newSingleThreadExecutor();
final Future<?> submissionFuture =
Expand Down Expand Up @@ -169,8 +179,10 @@ public SparkStructuredStreamingPipelineResult run(final Pipeline pipeline) {
return result;
}

private AbstractTranslationContext translatePipeline(Pipeline pipeline) {
private TranslationContext translatePipeline(Pipeline pipeline) {
PipelineTranslator.detectTranslationMode(pipeline, options);
Preconditions.checkArgument(
!options.isStreaming(), "%s does not support streaming pipelines.", getClass().getName());

// Default to using the primitive versions of Read.Bounded and Read.Unbounded for non-portable
// execution.
Expand All @@ -182,10 +194,7 @@ private AbstractTranslationContext translatePipeline(Pipeline pipeline) {

PipelineTranslator.replaceTransforms(pipeline, options);
prepareFilesToStage(options);
PipelineTranslator pipelineTranslator =
options.isStreaming()
? new PipelineTranslatorStreaming(options)
: new PipelineTranslatorBatch(options);
PipelineTranslator pipelineTranslator = new PipelineTranslatorBatch(options);
aromanenko-dev marked this conversation as resolved.
Show resolved Hide resolved

final JavaSparkContext jsc =
JavaSparkContext.fromSparkContext(
Expand Down
Loading