diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index b0b364ac3ed2..b097b9c8b4f3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -17,10 +17,10 @@ */ package org.apache.beam.runners.flink; -import java.util.List; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.FileStagingOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; @@ -32,26 +32,12 @@ * requiring flink on the classpath (e.g. to use with the direct runner). */ public interface FlinkPipelineOptions - extends PipelineOptions, ApplicationNameOptions, StreamingOptions { + extends PipelineOptions, ApplicationNameOptions, StreamingOptions, FileStagingOptions { String AUTO = "[auto]"; String PIPELINED = "PIPELINED"; String EXACTLY_ONCE = "EXACTLY_ONCE"; - /** - * List of local files to make available to workers. - * - *

Jars are placed on the worker's classpath. - * - *

The default value is the list of jars from the main program's classpath. - */ - @Description( - "Jar-Files to send to all workers and put on the classpath. " - + "The default value is all files from the classpath.") - List getFilesToStage(); - - void setFilesToStage(List value); - /** * The url of the Flink JobManager on which to execute pipelines. This can either be the the * address of a cluster JobManager, in the form "host:port" or one of the special Strings diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java index 89892d0ab003..751bca63f32a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.options; import com.fasterxml.jackson.annotation.JsonIgnore; -import java.util.List; import org.apache.beam.runners.dataflow.DataflowRunnerInfo; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -26,13 +25,14 @@ import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.FileStagingOptions; import org.apache.beam.sdk.options.Hidden; import org.apache.beam.sdk.options.PipelineOptions; import org.checkerframework.checker.nullness.qual.Nullable; /** Options that are used to configure the Dataflow pipeline worker pool. */ @Description("Options that are used to configure the Dataflow pipeline worker pool.") -public interface DataflowPipelineWorkerPoolOptions extends GcpOptions { +public interface DataflowPipelineWorkerPoolOptions extends GcpOptions, FileStagingOptions { /** * Number of workers to use when executing the Dataflow job. Note that selection of an autoscaling * algorithm other then {@code NONE} will affect the size of the worker pool. If left unspecified, @@ -184,21 +184,6 @@ public String create(PipelineOptions options) { void setWorkerMachineType(String value); - /** - * List of local files to make available to workers. - * - *

Files are placed on the worker's classpath. - * - *

The default value is the list of jars from the main program's classpath. - */ - @Description( - "Files to stage on GCS and make available to workers. " - + "Files are placed on the worker's classpath. " - + "The default value is all files from the classpath.") - List getFilesToStage(); - - void setFilesToStage(List value); - /** * Specifies what type of persistent disk is used. The value is a full disk type resource, e.g., * compute.googleapis.com/projects//zones//diskTypes/pd-ssd. For more information, see the Jars are placed on the worker's classpath. - * - *

The default value is the list of jars from the main program's classpath. - */ - @Description( - "Jar-Files to send to all workers and put on the classpath. " - + "The default value is all files from the classpath.") - List getFilesToStage(); - - void setFilesToStage(List value); - @Description("Enable/disable sending aggregator values to Spark's metric sinks") @Default.Boolean(true) Boolean getEnableSparkMetricSinks(); diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java index 0782271f506e..f69c46291ace 100644 --- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java +++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/Twister2PipelineOptions.java @@ -19,14 +19,15 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import edu.iu.dsc.tws.tset.env.TSetEnvironment; -import java.util.List; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.FileStagingOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; /** Twister2PipelineOptions. */ -public interface Twister2PipelineOptions extends PipelineOptions, StreamingOptions { +public interface Twister2PipelineOptions + extends PipelineOptions, StreamingOptions, FileStagingOptions { @Description("set parallelism for Twister2 processor") @Default.Integer(1) @@ -46,12 +47,6 @@ public interface Twister2PipelineOptions extends PipelineOptions, StreamingOptio void setClusterType(String name); - @Description( - "Jar-Files to send to all workers and put on the classpath. The default value is all files from the classpath.") - List getFilesToStage(); - - void setFilesToStage(List value); - @Description("Job file zip") String getJobFileZip(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java new file mode 100644 index 000000000000..d480d123bc20 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/FileStagingOptions.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.options; + +import java.util.List; +import org.apache.beam.sdk.annotations.Experimental; + +/** File staging related options. */ +@Experimental +public interface FileStagingOptions extends PipelineOptions { + /** + * List of local files to make available to workers. + * + *

Files are placed on the worker's classpath. + * + *

The default value is the list of jars from the main program's classpath. + */ + @Description( + "Files to stage to the artifact service and make available to workers. Files are placed on " + + "the worker's classpath. The default value is all files from the classpath.") + List getFilesToStage(); + + void setFilesToStage(List value); +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java index 16bac4bb47a4..2df906c25d25 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java @@ -25,23 +25,7 @@ /** Pipeline options common to all portable runners. */ @Experimental(Kind.PORTABILITY) -public interface PortablePipelineOptions extends PipelineOptions { - - // TODO: https://issues.apache.org/jira/browse/BEAM-4106: Consider pulling this out into a new - // options interface, e.g., FileStagingOptions. - /** - * List of local files to make available to workers. - * - *

Files are placed on the worker's classpath. - * - *

The default value is the list of jars from the main program's classpath. - */ - @Description( - "Files to stage to the artifact service and make available to workers. Files are placed on " - + "the worker's classpath. The default value is all files from the classpath.") - List getFilesToStage(); - - void setFilesToStage(List value); +public interface PortablePipelineOptions extends PipelineOptions, FileStagingOptions { @Description( "Job service endpoint to use. Should be in the form of address and port, e.g. localhost:3000")