From a71c39f1bfa03e8634d23cc57a8a5b9919afc596 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Thu, 23 Jan 2025 17:52:02 +0800 Subject: [PATCH 1/3] [FLINK-35713] Add parallelism in source and sink --- .../content.zh/docs/core-concept/data-sink.md | 1 + .../docs/core-concept/data-source.md | 1 + docs/content/docs/core-concept/data-sink.md | 1 + docs/content/docs/core-concept/data-source.md | 1 + .../parser/YamlPipelineDefinitionParser.java | 23 +++- .../flink/cdc/composer/PipelineExecution.java | 10 ++ .../cdc/composer/definition/SinkDef.java | 30 ++++- .../cdc/composer/definition/SourceDef.java | 21 ++- .../composer/flink/FlinkPipelineComposer.java | 12 +- .../flink/FlinkPipelineExecution.java | 5 + .../flink/translator/DataSinkTranslator.java | 17 ++- .../flink/FlinkPipelineComposerITCase.java | 120 ++++++++++++++++++ .../translator/DataSinkTranslatorTest.java | 3 +- .../sink/DorisMetadataApplierITCase.java | 3 +- .../sink/StarRocksMetadataApplierITCase.java | 3 +- 15 files changed, 228 insertions(+), 23 deletions(-) diff --git a/docs/content.zh/docs/core-concept/data-sink.md b/docs/content.zh/docs/core-concept/data-sink.md index 2dab1dc4a70..6e07db6a68c 100644 --- a/docs/content.zh/docs/core-concept/data-sink.md +++ b/docs/content.zh/docs/core-concept/data-sink.md @@ -35,6 +35,7 @@ To describe a data sink, the follows are required: |-----------------------------|-------------------------------------------------------------------------------------------------|-------------------| | type | The type of the sink, such as doris or starrocks. | required | | name | The name of the sink, which is user-defined (a default value provided). | optional | +| parallelism | The parallelism of the sink (or will `pipeline.parallelism`). | optional | | configurations of Data Sink | Configurations to build the Data Sink e.g. connection configurations and sink table properties. | optional | # Example diff --git a/docs/content.zh/docs/core-concept/data-source.md b/docs/content.zh/docs/core-concept/data-source.md index 5d6c33deb82..d22ddd4c02b 100644 --- a/docs/content.zh/docs/core-concept/data-source.md +++ b/docs/content.zh/docs/core-concept/data-source.md @@ -35,6 +35,7 @@ To describe a data source, the follows are required: |-------------------------------|-----------------------------------------------------------------------------------------------------|-------------------| | type | The type of the source, such as mysql. | required | | name | The name of the source, which is user-defined (a default value provided). | optional | +| parallelism | The parallelism of the source (or will `pipeline.parallelism`). | optional | | configurations of Data Source | Configurations to build the Data Source e.g. connection configurations and source table properties. | optional | # Example diff --git a/docs/content/docs/core-concept/data-sink.md b/docs/content/docs/core-concept/data-sink.md index 2dab1dc4a70..6e07db6a68c 100644 --- a/docs/content/docs/core-concept/data-sink.md +++ b/docs/content/docs/core-concept/data-sink.md @@ -35,6 +35,7 @@ To describe a data sink, the follows are required: |-----------------------------|-------------------------------------------------------------------------------------------------|-------------------| | type | The type of the sink, such as doris or starrocks. | required | | name | The name of the sink, which is user-defined (a default value provided). | optional | +| parallelism | The parallelism of the sink (or will `pipeline.parallelism`). | optional | | configurations of Data Sink | Configurations to build the Data Sink e.g. connection configurations and sink table properties. | optional | # Example diff --git a/docs/content/docs/core-concept/data-source.md b/docs/content/docs/core-concept/data-source.md index 5d6c33deb82..d22ddd4c02b 100644 --- a/docs/content/docs/core-concept/data-source.md +++ b/docs/content/docs/core-concept/data-source.md @@ -35,6 +35,7 @@ To describe a data source, the follows are required: |-------------------------------|-----------------------------------------------------------------------------------------------------|-------------------| | type | The type of the source, such as mysql. | required | | name | The name of the source, which is user-defined (a default value provided). | optional | +| parallelism | The parallelism of the source (or will `pipeline.parallelism`). | optional | | configurations of Data Source | Configurations to build the Data Source e.g. connection configurations and source table properties. | optional | # Example diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index b594aa35bfc..7c00c027bf0 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -64,6 +64,7 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser { // Source / sink keys private static final String TYPE_KEY = "type"; private static final String NAME_KEY = "name"; + private static final String PARALLELISM_KEY = "parallelism"; private static final String INCLUDE_SCHEMA_EVOLUTION_TYPES = "include.schema.changes"; private static final String EXCLUDE_SCHEMA_EVOLUTION_TYPES = "exclude.schema.changes"; @@ -188,7 +189,14 @@ private SourceDef toSourceDef(JsonNode sourceNode) { // "name" field is optional String name = sourceMap.remove(NAME_KEY); - return new SourceDef(type, name, Configuration.fromMap(sourceMap)); + // "parallelism" field is optional + Integer parallelismValue = null; + String parallelism = sourceMap.remove(PARALLELISM_KEY); + if (parallelism != null) { + parallelismValue = Integer.parseInt(parallelism); + } + + return new SourceDef(type, name, Configuration.fromMap(sourceMap), parallelismValue); } private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) { @@ -226,8 +234,7 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe ((ObjectNode) sinkNode).remove(EXCLUDE_SCHEMA_EVOLUTION_TYPES); } - Map sinkMap = - mapper.convertValue(sinkNode, new TypeReference>() {}); + Map sinkMap = mapper.convertValue(sinkNode, new TypeReference<>() {}); // "type" field is required String type = @@ -239,7 +246,15 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe // "name" field is optional String name = sinkMap.remove(NAME_KEY); - return new SinkDef(type, name, Configuration.fromMap(sinkMap), declaredSETypes); + // "parallelism" field is optional + Integer parallelismValue = null; + String parallelism = sinkMap.remove(PARALLELISM_KEY); + if (parallelism != null) { + parallelismValue = Integer.parseInt(parallelism); + } + + return new SinkDef( + type, name, Configuration.fromMap(sinkMap), declaredSETypes, parallelismValue); } private RouteDef toRouteDef(JsonNode routeNode) { diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineExecution.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineExecution.java index 8c3d1632be5..a56121089b7 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineExecution.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/PipelineExecution.java @@ -17,12 +17,22 @@ package org.apache.flink.cdc.composer; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; + /** A pipeline execution that can be executed by a computing engine. */ public interface PipelineExecution { /** Execute the pipeline. */ ExecutionInfo execute() throws Exception; + /** + * Get the plan of the pipeline. + * + * @return The plan of the pipeline. + */ + @VisibleForTesting + String getPlan() throws Exception; + /** Information of the execution. */ class ExecutionInfo { private final String id; diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/SinkDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/SinkDef.java index 10772dabf15..3859caef65e 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/SinkDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/SinkDef.java @@ -44,15 +44,16 @@ public class SinkDef { private final String type; @Nullable private final String name; + @Nullable private final Integer parallelism; private final Configuration config; private final Set includedSchemaEvolutionTypes; public SinkDef(String type, @Nullable String name, Configuration config) { - this.type = type; - this.name = name; - this.config = config; - this.includedSchemaEvolutionTypes = - Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet()); + this( + type, + name, + config, + Arrays.stream(SchemaChangeEventTypeFamily.ALL).collect(Collectors.toSet())); } public SinkDef( @@ -60,10 +61,20 @@ public SinkDef( @Nullable String name, Configuration config, Set includedSchemaEvolutionTypes) { + this(type, name, config, includedSchemaEvolutionTypes, null); + } + + public SinkDef( + String type, + @Nullable String name, + Configuration config, + Set includedSchemaEvolutionTypes, + @Nullable Integer parallelism) { this.type = type; this.name = name; this.config = config; this.includedSchemaEvolutionTypes = includedSchemaEvolutionTypes; + this.parallelism = parallelism; } public String getType() { @@ -82,6 +93,10 @@ public Set getIncludedSchemaEvolutionTypes() { return includedSchemaEvolutionTypes; } + public Optional getParallelism() { + return Optional.ofNullable(parallelism); + } + @Override public String toString() { return "SinkDef{" @@ -91,6 +106,8 @@ public String toString() { + ", name='" + name + '\'' + + ", parallelism=" + + parallelism + ", config=" + config + ", includedSchemaEvolutionTypes=" @@ -109,6 +126,7 @@ public boolean equals(Object o) { SinkDef sinkDef = (SinkDef) o; return Objects.equals(type, sinkDef.type) && Objects.equals(name, sinkDef.name) + && Objects.equals(parallelism, sinkDef.parallelism) && Objects.equals(config, sinkDef.config) && Objects.equals( includedSchemaEvolutionTypes, sinkDef.includedSchemaEvolutionTypes); @@ -116,6 +134,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(type, name, config, includedSchemaEvolutionTypes); + return Objects.hash(type, name, config, includedSchemaEvolutionTypes, parallelism); } } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/SourceDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/SourceDef.java index 5a28281d562..b115b427d3f 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/SourceDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/SourceDef.java @@ -39,12 +39,22 @@ public class SourceDef { private final String type; @Nullable private final String name; + @Nullable private final Integer parallelism; private final Configuration config; public SourceDef(String type, @Nullable String name, Configuration config) { + this(type, name, config, null); + } + + public SourceDef( + String type, + @Nullable String name, + Configuration config, + @Nullable Integer parallelism) { this.type = type; this.name = name; this.config = config; + this.parallelism = parallelism; } public String getType() { @@ -59,6 +69,10 @@ public Configuration getConfig() { return config; } + public Optional getParallelism() { + return Optional.ofNullable(parallelism); + } + @Override public String toString() { return "SourceDef{" @@ -68,6 +82,8 @@ public String toString() { + ", name='" + name + '\'' + + ", parallelism=" + + parallelism + ", config=" + config + '}'; @@ -84,11 +100,12 @@ public boolean equals(Object o) { SourceDef sourceDef = (SourceDef) o; return Objects.equals(type, sourceDef.type) && Objects.equals(name, sourceDef.name) - && Objects.equals(config, sourceDef.config); + && Objects.equals(config, sourceDef.config) + && Objects.equals(parallelism, sourceDef.parallelism); } @Override public int hashCode() { - return Objects.hash(type, name, config); + return Objects.hash(type, name, config, parallelism); } } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 533868c565b..25a1b9e617f 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -135,7 +135,11 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef) // O ---> Source DataStream stream = - sourceTranslator.translate(pipelineDef.getSource(), dataSource, env, parallelism); + sourceTranslator.translate( + pipelineDef.getSource(), + dataSource, + env, + pipelineDef.getSource().getParallelism().orElse(parallelism)); // Source ---> PreTransform stream = @@ -205,7 +209,11 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef) // Schema Operator -> Sink -> X sinkTranslator.translate( - pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate()); + pipelineDef.getSink(), + stream, + dataSink, + schemaOperatorIDGenerator.generate(), + pipelineDef.getSink().getParallelism().orElse(parallelism)); } private void addFrameworkJars() { diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineExecution.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineExecution.java index 018d5546fb9..8910f57ca68 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineExecution.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineExecution.java @@ -46,4 +46,9 @@ public ExecutionInfo execute() throws Exception { } return new ExecutionInfo(jobClient.getJobID().toString(), jobName); } + + @Override + public String getPlan() throws Exception { + return env.getExecutionPlan(); + } } diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java index efbdb8686ff..9372edc7c12 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java @@ -81,7 +81,8 @@ public void translate( SinkDef sinkDef, DataStream input, DataSink dataSink, - OperatorID schemaOperatorID) { + OperatorID schemaOperatorID, + int parallelism) { // Get sink provider EventSinkProvider eventSinkProvider = dataSink.getEventSinkProvider(); String sinkName = generateSinkName(sinkDef); @@ -89,13 +90,13 @@ public void translate( // Sink V2 FlinkSinkProvider sinkProvider = (FlinkSinkProvider) eventSinkProvider; Sink sink = sinkProvider.getSink(); - sinkTo(input, sink, sinkName, schemaOperatorID); + sinkTo(input, sink, sinkName, schemaOperatorID, parallelism); } else if (eventSinkProvider instanceof FlinkSinkFunctionProvider) { // SinkFunction FlinkSinkFunctionProvider sinkFunctionProvider = (FlinkSinkFunctionProvider) eventSinkProvider; SinkFunction sinkFunction = sinkFunctionProvider.getSinkFunction(); - sinkTo(input, sinkFunction, sinkName, schemaOperatorID); + sinkTo(input, sinkFunction, sinkName, schemaOperatorID, parallelism); } } @@ -104,8 +105,11 @@ void sinkTo( DataStream input, Sink sink, String sinkName, - OperatorID schemaOperatorID) { + OperatorID schemaOperatorID, + int parallelism) { DataStream stream = input; + stream.getExecutionEnvironment().setParallelism(parallelism); + // Pre-write topology if (sink instanceof WithPreWriteTopology) { stream = ((WithPreWriteTopology) sink).addPreWriteTopology(stream); @@ -125,7 +129,8 @@ private void sinkTo( DataStream input, SinkFunction sinkFunction, String sinkName, - OperatorID schemaOperatorID) { + OperatorID schemaOperatorID, + int parallelism) { DataSinkFunctionOperator sinkOperator = new DataSinkFunctionOperator(sinkFunction, schemaOperatorID); final StreamExecutionEnvironment executionEnvironment = input.getExecutionEnvironment(); @@ -134,7 +139,7 @@ private void sinkTo( input.getTransformation(), SINK_WRITER_PREFIX + sinkName, sinkOperator, - executionEnvironment.getParallelism(), + parallelism, false); executionEnvironment.addOperator(transformation); } diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index 55a205d495a..f6cf689c3d4 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -42,6 +42,8 @@ import org.apache.flink.cdc.composer.definition.SinkDef; import org.apache.flink.cdc.composer.definition.SourceDef; import org.apache.flink.cdc.composer.definition.TransformDef; +import org.apache.flink.cdc.composer.testsource.factory.DistributedDataSourceFactory; +import org.apache.flink.cdc.composer.testsource.source.DistributedSourceOptions; import org.apache.flink.cdc.connectors.values.ValuesDatabase; import org.apache.flink.cdc.connectors.values.factory.ValuesDataFactory; import org.apache.flink.cdc.connectors.values.sink.ValuesDataSink; @@ -52,6 +54,10 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.test.junit5.MiniClusterExtension; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -69,6 +75,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1525,6 +1532,119 @@ void testMergingDecimalWithVariousPrecisions(ValuesDataSink.SinkApi sinkApi) thr assertThat(outputEvents).containsExactlyInAnyOrder(expected); } + @ParameterizedTest + @EnumSource + void testRegularSourceSnkParallelism(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.MULTI_SPLITS_SINGLE_TABLE); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig, 9); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = + new SinkDef( + ValuesDataFactory.IDENTIFIER, + "Value Sink", + sinkConfig, + new HashSet<>(), + 10); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + PipelineExecution execution = composer.compose(pipelineDef); + String jsonPlan = execution.getPlan(); + int sourceParallelism = getParallelismByType(jsonPlan, "Source: Value Source"); + // SinkV2 or SinkFunction + int sinkParallelism = getParallelismByType(jsonPlan, "Sink: Sink Writer: Value Sink"); + if (sinkParallelism == -1) { + sinkParallelism = getParallelismByType(jsonPlan, "Sink Writer: Value Sink"); + } + assertThat(sourceParallelism).isEqualTo(9); + assertThat(sinkParallelism).isEqualTo(10); + } + + @ParameterizedTest + @EnumSource + void testDistributedSourceSnkParallelism(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set(DistributedSourceOptions.DISTRIBUTED_TABLES, true); + sourceConfig.set(DistributedSourceOptions.TABLE_COUNT, 4); + SourceDef sourceDef = + new SourceDef( + DistributedDataSourceFactory.IDENTIFIER, + "Distributed Source", + sourceConfig, + 9); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = + new SinkDef( + ValuesDataFactory.IDENTIFIER, + "Value Sink", + sinkConfig, + new HashSet<>(), + 10); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, MAX_PARALLELISM); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + PipelineExecution execution = composer.compose(pipelineDef); + String jsonPlan = execution.getPlan(); + int sourceParallelism = getParallelismByType(jsonPlan, "Source: Distributed Source"); + // SinkV2 or SinkFunction + int sinkParallelism = getParallelismByType(jsonPlan, "Sink: Sink Writer: Value Sink"); + if (sinkParallelism == -1) { + sinkParallelism = getParallelismByType(jsonPlan, "Sink Writer: Value Sink"); + } + assertThat(sourceParallelism).isEqualTo(9); + assertThat(sinkParallelism).isEqualTo(10); + } + + private int getParallelismByType(String jsonPlan, String targetType) { + JsonElement plan = JsonParser.parseString(jsonPlan); + JsonArray nodes = plan.getAsJsonObject().get("nodes").getAsJsonArray(); + for (JsonElement element : nodes) { + JsonObject node = element.getAsJsonObject(); + if (node.get("type").getAsString().equals(targetType)) { + return node.get("parallelism").getAsInt(); + } + } + return -1; + } + private List generateTemporalColumnEvents(String tableNamePrefix) { List events = new ArrayList<>(); diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java index d2b8206d5fb..51ab50081cb 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslatorTest.java @@ -53,7 +53,8 @@ void testPreWriteWithoutCommitSink() { inputStream, mockPreWriteWithoutCommitSink, "testPreWriteWithoutCommitSink", - new OperatorID()); + new OperatorID(), + inputStream.getParallelism()); // Check if the `addPreWriteTopology` is called, and the uid is set when the transformation // added diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java index 7dd3d4b5eca..a6f74f0c4d2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/test/java/org/apache/flink/cdc/connectors/doris/sink/DorisMetadataApplierITCase.java @@ -572,7 +572,8 @@ private void runJobWithEvents(List events) throws Exception { new SinkDef("doris", "Dummy Doris Sink", config), stream, dorisSink, - schemaOperatorIDGenerator.generate()); + schemaOperatorIDGenerator.generate(), + stream.getParallelism()); env.execute("Doris Schema Evolution Test"); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java index 8afd11fe7d0..0ae8e9e2ea2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java @@ -459,7 +459,8 @@ private void runJobWithEvents(List events) throws Exception { new SinkDef("starrocks", "Dummy StarRocks Sink", config), stream, starRocksSink, - schemaOperatorIDGenerator.generate()); + schemaOperatorIDGenerator.generate(), + stream.getParallelism()); env.execute("StarRocks Schema Evolution Test"); } From 74a59d888f965b4ae83ec4c8fb5060a3a354cd11 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Thu, 23 Jan 2025 17:54:01 +0800 Subject: [PATCH 2/3] hotfix --- docs/content.zh/docs/core-concept/data-sink.md | 2 +- docs/content.zh/docs/core-concept/data-source.md | 2 +- docs/content/docs/core-concept/data-sink.md | 2 +- docs/content/docs/core-concept/data-source.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/core-concept/data-sink.md b/docs/content.zh/docs/core-concept/data-sink.md index 6e07db6a68c..d48061ddce9 100644 --- a/docs/content.zh/docs/core-concept/data-sink.md +++ b/docs/content.zh/docs/core-concept/data-sink.md @@ -35,7 +35,7 @@ To describe a data sink, the follows are required: |-----------------------------|-------------------------------------------------------------------------------------------------|-------------------| | type | The type of the sink, such as doris or starrocks. | required | | name | The name of the sink, which is user-defined (a default value provided). | optional | -| parallelism | The parallelism of the sink (or will `pipeline.parallelism`). | optional | +| parallelism | The parallelism of the sink (or will use `pipeline.parallelism`). | optional | | configurations of Data Sink | Configurations to build the Data Sink e.g. connection configurations and sink table properties. | optional | # Example diff --git a/docs/content.zh/docs/core-concept/data-source.md b/docs/content.zh/docs/core-concept/data-source.md index d22ddd4c02b..39138c62002 100644 --- a/docs/content.zh/docs/core-concept/data-source.md +++ b/docs/content.zh/docs/core-concept/data-source.md @@ -35,7 +35,7 @@ To describe a data source, the follows are required: |-------------------------------|-----------------------------------------------------------------------------------------------------|-------------------| | type | The type of the source, such as mysql. | required | | name | The name of the source, which is user-defined (a default value provided). | optional | -| parallelism | The parallelism of the source (or will `pipeline.parallelism`). | optional | +| parallelism | The parallelism of the source (or will use `pipeline.parallelism`). | optional | | configurations of Data Source | Configurations to build the Data Source e.g. connection configurations and source table properties. | optional | # Example diff --git a/docs/content/docs/core-concept/data-sink.md b/docs/content/docs/core-concept/data-sink.md index 6e07db6a68c..d48061ddce9 100644 --- a/docs/content/docs/core-concept/data-sink.md +++ b/docs/content/docs/core-concept/data-sink.md @@ -35,7 +35,7 @@ To describe a data sink, the follows are required: |-----------------------------|-------------------------------------------------------------------------------------------------|-------------------| | type | The type of the sink, such as doris or starrocks. | required | | name | The name of the sink, which is user-defined (a default value provided). | optional | -| parallelism | The parallelism of the sink (or will `pipeline.parallelism`). | optional | +| parallelism | The parallelism of the sink (or will use `pipeline.parallelism`). | optional | | configurations of Data Sink | Configurations to build the Data Sink e.g. connection configurations and sink table properties. | optional | # Example diff --git a/docs/content/docs/core-concept/data-source.md b/docs/content/docs/core-concept/data-source.md index d22ddd4c02b..39138c62002 100644 --- a/docs/content/docs/core-concept/data-source.md +++ b/docs/content/docs/core-concept/data-source.md @@ -35,7 +35,7 @@ To describe a data source, the follows are required: |-------------------------------|-----------------------------------------------------------------------------------------------------|-------------------| | type | The type of the source, such as mysql. | required | | name | The name of the source, which is user-defined (a default value provided). | optional | -| parallelism | The parallelism of the source (or will `pipeline.parallelism`). | optional | +| parallelism | The parallelism of the source (or will use `pipeline.parallelism`). | optional | | configurations of Data Source | Configurations to build the Data Source e.g. connection configurations and source table properties. | optional | # Example From ef2b60d54c8afa7a12c7cffe47bb2470db987889 Mon Sep 17 00:00:00 2001 From: "zhangchaoming.zcm" Date: Thu, 13 Feb 2025 19:07:55 +0800 Subject: [PATCH 3/3] minor fix --- .../flink/cdc/cli/parser/YamlPipelineDefinitionParser.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java index 7c00c027bf0..a87ba9d48fe 100644 --- a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java +++ b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java @@ -234,7 +234,8 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe ((ObjectNode) sinkNode).remove(EXCLUDE_SCHEMA_EVOLUTION_TYPES); } - Map sinkMap = mapper.convertValue(sinkNode, new TypeReference<>() {}); + Map sinkMap = + mapper.convertValue(sinkNode, new TypeReference>() {}); // "type" field is required String type =