From bdf73b26501de62799c4968c197a6ab085019bb7 Mon Sep 17 00:00:00 2001 From: JerryYue-M <272614347@qq.com> Date: Sat, 2 Jul 2022 08:38:46 +0800 Subject: [PATCH] [HUDI-3953]Flink Hudi module should support low-level source and sink api (#5445) Co-authored-by: jerryyue --- .../org/apache/hudi/util/HoodiePipeline.java | 271 ++++++++++++++++++ .../hudi/sink/ITTestDataStreamWrite.java | 126 ++++++++ 2 files changed, 397 insertions(+) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java new file mode 100644 index 0000000000000..d23a278876d1e --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/HoodiePipeline.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.table.HoodieTableFactory; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.connector.sink.DataStreamSinkProvider; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext; +import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A tool class to construct hoodie flink pipeline. + * + *

How to use ?

+ * Method {@link #builder(String)} returns a pipeline builder. The builder + * can then define the hudi table columns, primary keys and partitions. + * + *

An example:

+ *
+ *    HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
+ *    DataStreamSink sinkStream = builder
+ *        .column("f0 int")
+ *        .column("f1 varchar(10)")
+ *        .column("f2 varchar(20)")
+ *        .pk("f0,f1")
+ *        .partition("f2")
+ *        .sink(input, false);
+ *  
+ */ +public class HoodiePipeline { + + private static final Logger LOG = LogManager.getLogger(HoodiePipeline.class); + + /** + * Returns the builder for hoodie pipeline construction. + */ + public static Builder builder(String tableName) { + return new Builder(tableName); + } + + /** + * Builder for hudi source/sink pipeline construction. + */ + public static class Builder { + private final String tableName; + private final List columns; + private final Map options; + + private String pk; + private List partitions; + + private Builder(String tableName) { + this.tableName = tableName; + this.columns = new ArrayList<>(); + this.options = new HashMap<>(); + this.partitions = new ArrayList<>(); + } + + /** + * Add a table column definition. + * + * @param column the column format should be in the form like 'f0 int' + */ + public Builder column(String column) { + this.columns.add(column); + return this; + } + + /** + * Add primary keys. + */ + public Builder pk(String... pks) { + this.pk = String.join(",", pks); + return this; + } + + /** + * Add partition fields. + */ + public Builder partition(String... partitions) { + this.partitions = new ArrayList<>(Arrays.asList(partitions)); + return this; + } + + /** + * Add a config option. + */ + public Builder option(ConfigOption option, Object val) { + this.options.put(option.key(), val.toString()); + return this; + } + + public Builder option(String key, Object val) { + this.options.put(key, val.toString()); + return this; + } + + public Builder options(Map options) { + this.options.putAll(options); + return this; + } + + public DataStreamSink sink(DataStream input, boolean bounded) { + TableDescriptor tableDescriptor = getTableDescriptor(); + return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getResolvedCatalogTable(), bounded); + } + + public TableDescriptor getTableDescriptor() { + EnvironmentSettings environmentSettings = EnvironmentSettings + .newInstance() + .build(); + TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(environmentSettings); + String sql = getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions); + tableEnv.executeSql(sql); + String currentCatalog = tableEnv.getCurrentCatalog(); + ResolvedCatalogTable catalogTable = null; + String defaultDatabase = null; + try { + Catalog catalog = tableEnv.getCatalog(currentCatalog).get(); + defaultDatabase = catalog.getDefaultDatabase(); + catalogTable = (ResolvedCatalogTable) catalog.getTable(new ObjectPath(defaultDatabase, this.tableName)); + } catch (TableNotExistException e) { + throw new HoodieException("Create table " + this.tableName + " exception", e); + } + ObjectIdentifier tableId = ObjectIdentifier.of(currentCatalog, defaultDatabase, this.tableName); + return new TableDescriptor(tableId, catalogTable); + } + + public DataStream source(StreamExecutionEnvironment execEnv) { + TableDescriptor tableDescriptor = getTableDescriptor(); + return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getResolvedCatalogTable()); + } + } + + private static String getCreateHoodieTableDDL( + String tableName, + List fields, + Map options, + String pkField, + List partitionField) { + StringBuilder builder = new StringBuilder(); + builder.append("create table ") + .append(tableName) + .append("(\n"); + for (String field : fields) { + builder.append(" ") + .append(field) + .append(",\n"); + } + builder.append(" PRIMARY KEY(") + .append(pkField) + .append(") NOT ENFORCED\n") + .append(")\n"); + if (!partitionField.isEmpty()) { + String partitons = partitionField + .stream() + .map(partitionName -> "`" + partitionName + "`") + .collect(Collectors.joining(",")); + builder.append("PARTITIONED BY (") + .append(partitons) + .append(")\n"); + } + builder.append("with ('connector' = 'hudi'"); + options.forEach((k, v) -> builder + .append(",\n") + .append(" '") + .append(k) + .append("' = '") + .append(v) + .append("'")); + builder.append("\n)"); + return builder.toString(); + } + + /** + * Returns the data stream sink with given catalog table. + * + * @param input The input datastream + * @param tablePath The table path to the hoodie table in the catalog + * @param catalogTable The hoodie catalog table + * @param isBounded A flag indicating whether the input data stream is bounded + */ + private static DataStreamSink sink(DataStream input, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable, boolean isBounded) { + FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable, + Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false); + HoodieTableFactory hoodieTableFactory = new HoodieTableFactory(); + return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context) + .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded))) + .consumeDataStream(input); + } + + /** + * Returns the data stream source with given catalog table. + * + * @param execEnv The execution environment + * @param tablePath The table path to the hoodie table in the catalog + * @param catalogTable The hoodie catalog table + */ + private static DataStream source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, ResolvedCatalogTable catalogTable) { + FactoryUtil.DefaultDynamicTableContext context = new FactoryUtil.DefaultDynamicTableContext(tablePath, catalogTable, + Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false); + HoodieTableFactory hoodieTableFactory = new HoodieTableFactory(); + DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory + .createDynamicTableSource(context)) + .getScanRuntimeProvider(new ScanRuntimeProviderContext()); + return dataStreamScanProvider.produceDataStream(execEnv); + } + + /*** + * A POJO that contains tableId and resolvedCatalogTable. + */ + public static class TableDescriptor { + private ObjectIdentifier tableId; + private ResolvedCatalogTable resolvedCatalogTable; + + public TableDescriptor(ObjectIdentifier tableId, ResolvedCatalogTable resolvedCatalogTable) { + this.tableId = tableId; + this.resolvedCatalogTable = resolvedCatalogTable; + } + + public ObjectIdentifier getTableId() { + return tableId; + } + + public ResolvedCatalogTable getResolvedCatalogTable() { + return resolvedCatalogTable; + } + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index a25f0149c149b..9ed0dfb807eb5 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -26,9 +26,11 @@ import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.sink.utils.Pipelines; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodiePipeline; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; +import org.apache.hudi.utils.TestUtils; import org.apache.hudi.utils.source.ContinuousFileSource; import org.apache.flink.api.common.JobStatus; @@ -57,6 +59,7 @@ import java.io.File; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -326,4 +329,127 @@ private void testWriteToHoodieWithCluster( TestData.checkWrittenFullData(tempFile, expected); } + + public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception { + JobClient client = execEnv.executeAsync(jobName); + if (isMor) { + if (client.getJobStatus().get() != JobStatus.FAILED) { + try { + TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish + client.cancel(); + } catch (Throwable var1) { + // ignored + } + } + } else { + // wait for the streaming job to finish + client.getJobExecutionResult().get(); + } + } + + @Test + public void testHoodiePipelineBuilderSource() throws Exception { + //create a StreamExecutionEnvironment instance. + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(1); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(FlinkOptions.TABLE_NAME, "t1"); + conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ"); + + // write 3 batches of data set + TestData.writeData(TestData.dataSetInsert(1, 2), conf); + TestData.writeData(TestData.dataSetInsert(3, 4), conf); + TestData.writeData(TestData.dataSetInsert(5, 6), conf); + + String latestCommit = TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()); + + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.READ_START_COMMIT.key(), latestCommit); + + //read a hoodie table use low-level source api. + HoodiePipeline.Builder builder = HoodiePipeline.builder("test_source") + .column("uuid string not null") + .column("name string") + .column("age int") + .column("`ts` timestamp(3)") + .column("`partition` string") + .pk("uuid") + .partition("partition") + .options(options); + DataStream rowDataDataStream = builder.source(execEnv); + List result = new ArrayList<>(); + rowDataDataStream.executeAndCollect().forEachRemaining(result::add); + TimeUnit.SECONDS.sleep(2);//sleep 2 second for collect data + TestData.assertRowDataEquals(result, TestData.dataSetInsert(5, 6)); + } + + @Test + public void testHoodiePipelineBuilderSink() throws Exception { + StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + Map options = new HashMap<>(); + execEnv.getConfig().disableObjectReuse(); + execEnv.setParallelism(4); + // set up checkpoint interval + execEnv.enableCheckpointing(4000, CheckpointingMode.EXACTLY_ONCE); + execEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + + options.put(FlinkOptions.INDEX_TYPE.key(), "FLINK_STATE"); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS.key(), "4"); + options.put("table.type", HoodieTableType.MERGE_ON_READ.name()); + options.put(FlinkOptions.INDEX_KEY_FIELD.key(), "id"); + options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "1"); + options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); + options.put(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH.key(), Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResource("test_read_schema.avsc")).toString()); + Configuration conf = Configuration.fromMap(options); + // Read from file source + RowType rowType = + (RowType) AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf)) + .getLogicalType(); + + JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema( + rowType, + InternalTypeInfo.of(rowType), + false, + true, + TimestampFormat.ISO_8601 + ); + String sourcePath = Objects.requireNonNull(Thread.currentThread() + .getContextClassLoader().getResource("test_source.data")).toString(); + + TextInputFormat format = new TextInputFormat(new Path(sourcePath)); + format.setFilesFilter(FilePathFilter.createDefaultFilter()); + TypeInformation typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + format.setCharsetName("UTF-8"); + + DataStream dataStream = execEnv + // use PROCESS_CONTINUOUSLY mode to trigger checkpoint + .readFile(format, sourcePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000, typeInfo) + .map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8))) + .setParallelism(1); + + + + //sink to hoodie table use low-level sink api. + HoodiePipeline.Builder builder = HoodiePipeline.builder("test_sink") + .column("uuid string not null") + .column("name string") + .column("age int") + .column("`ts` timestamp(3)") + .column("`partition` string") + .pk("uuid") + .partition("partition") + .options(options); + + builder.sink(dataStream, false); + + execute(execEnv, true, "Api_Sink_Test"); + TestData.checkWrittenFullData(tempFile, EXPECTED); + } + }