Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35713][common] Support set parallelism in source and sink #3887

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content.zh/docs/core-concept/data-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ under the License.
| name | source 的名称,允许用户配置 (提供了一个默认值)。 | optional |
| configurations of Data Source | 用于构建 source 组件的配置,例如连接参数或者表属性的配置。 | optional |


# 示例
我们可以使用yaml文件来定义一个mysql source:
```yaml
Expand Down
1 change: 1 addition & 0 deletions docs/content/docs/core-concept/data-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ To describe a data sink, the follows are required:
|-----------------------------|-------------------------------------------------------------------------------------------------|-------------------|
| type | The type of the sink, such as doris or starrocks. | required |
| name | The name of the sink, which is user-defined (a default value provided). | optional |
| parallelism | The parallelism of the sink (or will use `pipeline.parallelism`). | optional |
| configurations of Data Sink | Configurations to build the Data Sink e.g. connection configurations and sink table properties. | optional |

# Example
Expand Down
1 change: 1 addition & 0 deletions docs/content/docs/core-concept/data-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ To describe a data source, the follows are required:
|-------------------------------|-----------------------------------------------------------------------------------------------------|-------------------|
| type | The type of the source, such as mysql. | required |
| name | The name of the source, which is user-defined (a default value provided). | optional |
| parallelism | The parallelism of the source (or will use `pipeline.parallelism`). | optional |
| configurations of Data Source | Configurations to build the Data Source e.g. connection configurations and source table properties. | optional |

# Example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
// Source / sink keys
private static final String TYPE_KEY = "type";
private static final String NAME_KEY = "name";
private static final String PARALLELISM_KEY = "parallelism";
private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes";
private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes";

Expand Down Expand Up @@ -188,7 +189,14 @@ private SourceDef toSourceDef(JsonNode sourceNode) {
// "name" field is optional
String name = sourceMap.remove(NAME_KEY);

return new SourceDef(type, name, Configuration.fromMap(sourceMap));
// "parallelism" field is optional
Integer parallelismValue = null;
String parallelism = sourceMap.remove(PARALLELISM_KEY);
if (parallelism != null) {
parallelismValue = Integer.parseInt(parallelism);
}

return new SourceDef(type, name, Configuration.fromMap(sourceMap), parallelismValue);
}

private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) {
Expand Down Expand Up @@ -239,7 +247,15 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe
// "name" field is optional
String name = sinkMap.remove(NAME_KEY);

return new SinkDef(type, name, Configuration.fromMap(sinkMap), declaredSETypes);
// "parallelism" field is optional
Integer parallelismValue = null;
String parallelism = sinkMap.remove(PARALLELISM_KEY);
if (parallelism != null) {
parallelismValue = Integer.parseInt(parallelism);
}

return new SinkDef(
type, name, Configuration.fromMap(sinkMap), declaredSETypes, parallelismValue);
}

private RouteDef toRouteDef(JsonNode routeNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@

package org.apache.flink.cdc.composer;

import org.apache.flink.cdc.common.annotation.VisibleForTesting;

/** A pipeline execution that can be executed by a computing engine. */
public interface PipelineExecution {

/** Execute the pipeline. */
ExecutionInfo execute() throws Exception;

/**
* Get the plan of the pipeline.
*
* @return The plan of the pipeline.
*/
@VisibleForTesting
String getPlan() throws Exception;

/** Information of the execution. */
class ExecutionInfo {
private final String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,37 @@
public class SinkDef {
private final String type;
@Nullable private final String name;
@Nullable private final Integer parallelism;
private final Configuration config;
private final Set<SchemaChangeEventType> includedSchemaEvolutionTypes;

public SinkDef(String type, @Nullable String name, Configuration config) {
this.type = type;
this.name = name;
this.config = config;
this.includedSchemaEvolutionTypes =
Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet());
this(
type,
name,
config,
Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()));
}

public SinkDef(
String type,
@Nullable String name,
Configuration config,
Set<SchemaChangeEventType> includedSchemaEvolutionTypes) {
this(type, name, config, includedSchemaEvolutionTypes, null);
}

public SinkDef(
String type,
@Nullable String name,
Configuration config,
Set<SchemaChangeEventType> includedSchemaEvolutionTypes,
@Nullable Integer parallelism) {
this.type = type;
this.name = name;
this.config = config;
this.includedSchemaEvolutionTypes = includedSchemaEvolutionTypes;
this.parallelism = parallelism;
}

public String getType() {
Expand All @@ -82,6 +93,10 @@ public Set<SchemaChangeEventType> getIncludedSchemaEvolutionTypes() {
return includedSchemaEvolutionTypes;
}

public Optional<Integer> getParallelism() {
return Optional.ofNullable(parallelism);
}

@Override
public String toString() {
return "SinkDef{"
Expand All @@ -91,6 +106,8 @@ public String toString() {
+ ", name='"
+ name
+ '\''
+ ", parallelism="
+ parallelism
+ ", config="
+ config
+ ", includedSchemaEvolutionTypes="
Expand All @@ -109,13 +126,14 @@ public boolean equals(Object o) {
SinkDef sinkDef = (SinkDef) o;
return Objects.equals(type, sinkDef.type)
&& Objects.equals(name, sinkDef.name)
&& Objects.equals(parallelism, sinkDef.parallelism)
&& Objects.equals(config, sinkDef.config)
&& Objects.equals(
includedSchemaEvolutionTypes, sinkDef.includedSchemaEvolutionTypes);
}

@Override
public int hashCode() {
return Objects.hash(type, name, config, includedSchemaEvolutionTypes);
return Objects.hash(type, name, config, includedSchemaEvolutionTypes, parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,22 @@
public class SourceDef {
private final String type;
@Nullable private final String name;
@Nullable private final Integer parallelism;
private final Configuration config;

public SourceDef(String type, @Nullable String name, Configuration config) {
this(type, name, config, null);
}

public SourceDef(
String type,
@Nullable String name,
Configuration config,
@Nullable Integer parallelism) {
this.type = type;
this.name = name;
this.config = config;
this.parallelism = parallelism;
}

public String getType() {
Expand All @@ -59,6 +69,10 @@ public Configuration getConfig() {
return config;
}

public Optional<Integer> getParallelism() {
return Optional.ofNullable(parallelism);
}

@Override
public String toString() {
return "SourceDef{"
Expand All @@ -68,6 +82,8 @@ public String toString() {
+ ", name='"
+ name
+ '\''
+ ", parallelism="
+ parallelism
+ ", config="
+ config
+ '}';
Expand All @@ -84,11 +100,12 @@ public boolean equals(Object o) {
SourceDef sourceDef = (SourceDef) o;
return Objects.equals(type, sourceDef.type)
&& Objects.equals(name, sourceDef.name)
&& Objects.equals(config, sourceDef.config);
&& Objects.equals(config, sourceDef.config)
&& Objects.equals(parallelism, sourceDef.parallelism);
}

@Override
public int hashCode() {
return Objects.hash(type, name, config);
return Objects.hash(type, name, config, parallelism);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)

// O ---> Source
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), dataSource, env, parallelism);
sourceTranslator.translate(
pipelineDef.getSource(),
dataSource,
env,
pipelineDef.getSource().getParallelism().orElse(parallelism));

// Source ---> PreTransform
stream =
Expand Down Expand Up @@ -205,7 +209,11 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)

// Schema Operator -> Sink -> X
sinkTranslator.translate(
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());
pipelineDef.getSink(),
stream,
dataSink,
schemaOperatorIDGenerator.generate(),
pipelineDef.getSink().getParallelism().orElse(parallelism));
}

private void addFrameworkJars() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ public ExecutionInfo execute() throws Exception {
}
return new ExecutionInfo(jobClient.getJobID().toString(), jobName);
}

@Override
public String getPlan() throws Exception {
return env.getExecutionPlan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,22 @@ public void translate(
SinkDef sinkDef,
DataStream<Event> input,
DataSink dataSink,
OperatorID schemaOperatorID) {
OperatorID schemaOperatorID,
int parallelism) {
// Get sink provider
EventSinkProvider eventSinkProvider = dataSink.getEventSinkProvider();
String sinkName = generateSinkName(sinkDef);
if (eventSinkProvider instanceof FlinkSinkProvider) {
// Sink V2
FlinkSinkProvider sinkProvider = (FlinkSinkProvider) eventSinkProvider;
Sink<Event> sink = sinkProvider.getSink();
sinkTo(input, sink, sinkName, schemaOperatorID);
sinkTo(input, sink, sinkName, schemaOperatorID, parallelism);
} else if (eventSinkProvider instanceof FlinkSinkFunctionProvider) {
// SinkFunction
FlinkSinkFunctionProvider sinkFunctionProvider =
(FlinkSinkFunctionProvider) eventSinkProvider;
SinkFunction<Event> sinkFunction = sinkFunctionProvider.getSinkFunction();
sinkTo(input, sinkFunction, sinkName, schemaOperatorID);
sinkTo(input, sinkFunction, sinkName, schemaOperatorID, parallelism);
}
}

Expand All @@ -104,8 +105,11 @@ void sinkTo(
DataStream<Event> input,
Sink<Event> sink,
String sinkName,
OperatorID schemaOperatorID) {
OperatorID schemaOperatorID,
int parallelism) {
DataStream<Event> stream = input;
stream.getExecutionEnvironment().setParallelism(parallelism);

// Pre-write topology
if (sink instanceof WithPreWriteTopology) {
stream = ((WithPreWriteTopology<Event>) sink).addPreWriteTopology(stream);
Expand All @@ -125,7 +129,8 @@ private void sinkTo(
DataStream<Event> input,
SinkFunction<Event> sinkFunction,
String sinkName,
OperatorID schemaOperatorID) {
OperatorID schemaOperatorID,
int parallelism) {
DataSinkFunctionOperator sinkOperator =
new DataSinkFunctionOperator(sinkFunction, schemaOperatorID);
final StreamExecutionEnvironment executionEnvironment = input.getExecutionEnvironment();
Expand All @@ -134,7 +139,7 @@ private void sinkTo(
input.getTransformation(),
SINK_WRITER_PREFIX + sinkName,
sinkOperator,
executionEnvironment.getParallelism(),
parallelism,
false);
executionEnvironment.addOperator(transformation);
}
Expand Down
Loading