Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4106] Add FileStagingOptions and merge staging file options between runners #14423

Merged
merged 1 commit into from
Apr 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
*/
package org.apache.beam.runners.flink;

import java.util.List;
import org.apache.beam.sdk.options.ApplicationNameOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
Expand All @@ -32,26 +32,12 @@
* requiring flink on the classpath (e.g. to use with the direct runner).
*/
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions {

String AUTO = "[auto]";
String PIPELINED = "PIPELINED";
String EXACTLY_ONCE = "EXACTLY_ONCE";

/**
* List of local files to make available to workers.
*
* <p>Jars are placed on the worker's classpath.
*
* <p>The default value is the list of jars from the main program's classpath.
*/
@Description(
"Jar-Files to send to all workers and put on the classpath. "
+ "The default value is all files from the classpath.")
List<String> getFilesToStage();

void setFilesToStage(List<String> value);

/**
* The url of the Flink JobManager on which to execute pipelines. This can either be the the
* address of a cluster JobManager, in the form "host:port" or one of the special Strings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
package org.apache.beam.runners.dataflow.options;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.List;
import org.apache.beam.runners.dataflow.DataflowRunnerInfo;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Options that are used to configure the Dataflow pipeline worker pool. */
@Description("Options that are used to configure the Dataflow pipeline worker pool.")
public interface DataflowPipelineWorkerPoolOptions extends GcpOptions {
public interface DataflowPipelineWorkerPoolOptions extends GcpOptions, FileStagingOptions {
/**
* Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling
* algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified,
Expand Down Expand Up @@ -184,21 +184,6 @@ public String create(PipelineOptions options) {

void setWorkerMachineType(String value);

/**
* List of local files to make available to workers.
*
* <p>Files are placed on the worker's classpath.
*
* <p>The default value is the list of jars from the main program's classpath.
*/
@Description(
"Files to stage on GCS and make available to workers. "
+ "Files are placed on the worker's classpath. "
+ "The default value is all files from the classpath.")
List<String> getFilesToStage();

void setFilesToStage(List<String> value);

/**
* Specifies what type of persistent disk is used. The value is a full disk type resource, e.g.,
* compute.googleapis.com/projects//zones//diskTypes/pd-ssd. For more information, see the <a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.DefaultValueFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
Expand All @@ -35,7 +36,8 @@
* master address, and other user-related knobs.
*/
public interface SparkCommonPipelineOptions
extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
extends PipelineOptions, StreamingOptions, ApplicationNameOptions, FileStagingOptions {

String DEFAULT_MASTER_URL = "local[4]";

@Description("The url of the spark master to connect to, (e.g. spark://host:port, local[4]).")
Expand All @@ -52,20 +54,6 @@ public interface SparkCommonPipelineOptions

void setCheckpointDir(String checkpointDir);

/**
* List of local files to make available to workers.
*
* <p>Jars are placed on the worker's classpath.
*
* <p>The default value is the list of jars from the main program's classpath.
*/
@Description(
"Jar-Files to send to all workers and put on the classpath. "
+ "The default value is all files from the classpath.")
List<String> getFilesToStage();

void setFilesToStage(List<String> value);

@Description("Enable/disable sending aggregator values to Spark's metric sinks")
@Default.Boolean(true)
Boolean getEnableSparkMetricSinks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import edu.iu.dsc.tws.tset.env.TSetEnvironment;
import java.util.List;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.FileStagingOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;

/** Twister2PipelineOptions. */
public interface Twister2PipelineOptions extends PipelineOptions, StreamingOptions {
public interface Twister2PipelineOptions
extends PipelineOptions, StreamingOptions, FileStagingOptions {

@Description("set parallelism for Twister2 processor")
@Default.Integer(1)
Expand All @@ -46,12 +47,6 @@ public interface Twister2PipelineOptions extends PipelineOptions, StreamingOptio

void setClusterType(String name);

@Description(
"Jar-Files to send to all workers and put on the classpath. The default value is all files from the classpath.")
List<String> getFilesToStage();

void setFilesToStage(List<String> value);

@Description("Job file zip")
String getJobFileZip();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.options;

import java.util.List;
import org.apache.beam.sdk.annotations.Experimental;

/** File staging related options. */
@Experimental
public interface FileStagingOptions extends PipelineOptions {
/**
* List of local files to make available to workers.
*
* <p>Files are placed on the worker's classpath.
*
* <p>The default value is the list of jars from the main program's classpath.
*/
@Description(
"Files to stage to the artifact service and make available to workers. Files are placed on "
+ "the worker's classpath. The default value is all files from the classpath.")
List<String> getFilesToStage();

void setFilesToStage(List<String> value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,7 @@

/** Pipeline options common to all portable runners. */
@Experimental(Kind.PORTABILITY)
public interface PortablePipelineOptions extends PipelineOptions {

// TODO: https://issues.apache.org/jira/browse/BEAM-4106: Consider pulling this out into a new
// options interface, e.g., FileStagingOptions.
/**
* List of local files to make available to workers.
*
* <p>Files are placed on the worker's classpath.
*
* <p>The default value is the list of jars from the main program's classpath.
*/
@Description(
"Files to stage to the artifact service and make available to workers. Files are placed on "
+ "the worker's classpath. The default value is all files from the classpath.")
List<String> getFilesToStage();

void setFilesToStage(List<String> value);
public interface PortablePipelineOptions extends PipelineOptions, FileStagingOptions {

@Description(
"Job service endpoint to use. Should be in the form of address and port, e.g. localhost:3000")
Expand Down