diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md index 1d562d66328..c3d239db379 100644 --- a/docs/en/connector-v2/sink/HdfsFile.md +++ b/docs/en/connector-v2/sink/HdfsFile.md @@ -19,6 +19,7 @@ By default, we use 2PC commit to ensure `exactly-once` - [x] parquet - [x] orc - [x] json + - [x] excel ## Options @@ -59,7 +60,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran We supported as the following file types: -`text` `csv` `parquet` `orc` `json` +`text` `csv` `parquet` `orc` `json` `excel` Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`. @@ -86,6 +87,10 @@ The separator between columns in a row of data. Only needed by `text` and `csv` The separator between rows in a file. Only needed by `text` and `csv` file format. +### max_rows_in_memory [int] + +When File Format is Excel,The maximum number of data items that can be cached in the memory.Note that you need to install fonts when using openjdk. + ### partition_by [array] Partition data based on selected fields @@ -180,6 +185,26 @@ HdfsFile { ``` +For excel file format + +```bash + +HdfsFile { + fs.defaultFS="hdfs://hadoopcluster" + path="/tmp/hive/warehouse/test2" + partition_by=["age"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="excel" + max_rows_in_memory=10000 + sink_columns=["name","age"] + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true +} + +``` + ## Changelog ### 2.2.0-beta 2022-09-26 diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index d0fa20da6e5..4719c26beb7 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -14,11 +14,11 @@ By default, we use 2PC commit to ensure `exactly-once` - [ ] [schema projection](../../concept/connector-v2-features.md) - [x] file format - - [x] text - - [x] csv - - [x] parquet - - [x] orc - - [x] json + - [x] text + - [x] csv + - [x] parquet + - [x] orc + - [x] json ## Options diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml index f9209305665..a0fc2c01c4c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml @@ -36,6 +36,8 @@ 3.4 2.7.5-7.0 1.12.3 + 4.1.2 + 4.1.2 @@ -144,6 +146,17 @@ + + org.apache.poi + poi + ${poi.version} + + + + org.apache.poi + poi-ooxml + ${poi-ooxml.version} + diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java index 4cddc2f4cff..bfe3002c140 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java @@ -18,11 +18,13 @@ package org.apache.seatunnel.connectors.seatunnel.file.config; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ExcelWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy; import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy; @@ -87,6 +89,17 @@ public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) { public ReadStrategy getReadStrategy() { return new JsonReadStrategy(); } + }, + EXCEL("xlsx") { + @Override + public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) { + return new ExcelWriteStrategy(textFileSinkConfig); + } + + @Override + public ReadStrategy getReadStrategy() { + return new ExcelReadStrategy(); + } }; private final String suffix; diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java index c56171172d0..38ec2dd288d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java @@ -62,6 +62,8 @@ public class TextFileSinkConfig extends BaseTextFileConfig implements PartitionC private List partitionFieldsIndexInRow; + private int maxRowsInMemory; + public TextFileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunnelRowTypeInfo) { super(config); checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames()))); @@ -144,5 +146,9 @@ public TextFileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaT .map(columnsMap::get) .collect(Collectors.toList()); } + + if (config.hasPath("max_rows_in_memory")) { + this.maxRowsInMemory = config.getInt("max_rows_in_memory"); + } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/ExcelGenerator.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/ExcelGenerator.java new file mode 100644 index 00000000000..53a6b884fbd --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/ExcelGenerator.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sink.util; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.DateTimeUtils; +import org.apache.seatunnel.common.utils.DateUtils; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.common.utils.TimeUtils; +import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig; + +import org.apache.poi.ss.usermodel.Cell; +import org.apache.poi.ss.usermodel.CellStyle; +import org.apache.poi.ss.usermodel.CreationHelper; +import org.apache.poi.ss.usermodel.Row; +import org.apache.poi.ss.usermodel.Sheet; +import org.apache.poi.ss.usermodel.Workbook; +import org.apache.poi.xssf.streaming.SXSSFWorkbook; +import org.apache.poi.xssf.usermodel.XSSFWorkbook; + +import java.io.IOException; +import java.io.OutputStream; +import java.lang.reflect.Array; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.List; + +public class ExcelGenerator { + private List sinkColumnsIndexInRow; + private SeaTunnelRowType seaTunnelRowType; + private final DateUtils.Formatter dateFormat; + private final DateTimeUtils.Formatter dateTimeFormat; + private final TimeUtils.Formatter timeFormat; + private final String fieldDelimiter; + private Workbook wb; + private CellStyle wholeNumberCellStyle; + private CellStyle stringCellStyle; + private CellStyle dateCellStyle; + private CellStyle dateTimeCellStyle; + private CellStyle timeCellStyle; + private Sheet st; + private int row = 0; + public ExcelGenerator(List sinkColumnsIndexInRow, SeaTunnelRowType seaTunnelRowType, TextFileSinkConfig textFileSinkConfig) { + this.sinkColumnsIndexInRow = sinkColumnsIndexInRow; + this.seaTunnelRowType = seaTunnelRowType; + if (textFileSinkConfig.getMaxRowsInMemory() > 0) { + wb = new SXSSFWorkbook(textFileSinkConfig.getMaxRowsInMemory()); + } else { + wb = new XSSFWorkbook(); + } + this.st = wb.createSheet("Sheet1"); + Row row = st.createRow(this.row); + for (Integer i : sinkColumnsIndexInRow) { + String fieldName = seaTunnelRowType.getFieldName(i); + row.createCell(i).setCellValue(fieldName); + } + this.dateFormat = textFileSinkConfig.getDateFormat(); + this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat(); + this.timeFormat = textFileSinkConfig.getTimeFormat(); + this.fieldDelimiter = textFileSinkConfig.getFieldDelimiter(); + wholeNumberCellStyle = createStyle(wb, "General"); + stringCellStyle = createStyle(wb, "@"); + dateCellStyle = createStyle(wb, dateFormat.getValue()); + dateTimeCellStyle = createStyle(wb, dateTimeFormat.getValue()); + timeCellStyle = createStyle(wb, timeFormat.getValue()); + + this.row += 1; + } + + public void writeData(SeaTunnelRow seaTunnelRow) { + Row excelRow = this.st.createRow(this.row); + SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + for (Integer i : sinkColumnsIndexInRow) { + Cell cell = excelRow.createCell(i); + Object value = seaTunnelRow.getField(i); + setCellValue(fieldTypes[i], value, cell); + } + this.row += 1; + } + + public void flushAndCloseExcel(OutputStream output) throws IOException { + wb.write(output); + wb.close(); + } + + private void setCellValue(SeaTunnelDataType type, Object value, Cell cell) { + if (value == null) { + cell.setBlank(); + } else { + switch (type.getSqlType()) { + case STRING: + cell.setCellValue((String) value); + cell.setCellStyle(stringCellStyle); + break; + case BOOLEAN: + cell.setCellValue((Boolean) value); + cell.setCellStyle(wholeNumberCellStyle); + break; + case SMALLINT: + cell.setCellValue((short) value); + cell.setCellStyle(wholeNumberCellStyle); + break; + case TINYINT: + cell.setCellValue((byte) value); + cell.setCellStyle(wholeNumberCellStyle); + break; + case INT: + cell.setCellValue((int) value); + cell.setCellStyle(wholeNumberCellStyle); + break; + case BIGINT: + cell.setCellValue((long) value); + cell.setCellStyle(wholeNumberCellStyle); + break; + case FLOAT: + cell.setCellValue((float) value); + cell.setCellStyle(wholeNumberCellStyle); + break; + case DOUBLE: + cell.setCellValue((double) value); + cell.setCellStyle(wholeNumberCellStyle); + break; + case DECIMAL: + cell.setCellValue(Double.parseDouble(value.toString())); + cell.setCellStyle(wholeNumberCellStyle); + break; + case BYTES: + List arrayData = new ArrayList<>(); + for (int i = 0; i < Array.getLength(value); i++) { + arrayData.add(String.valueOf(Array.get(value, i))); + } + cell.setCellValue(arrayData.toString()); + cell.setCellStyle(stringCellStyle); + break; + case MAP: + case ARRAY: + cell.setCellValue(JsonUtils.toJsonString(value)); + cell.setCellStyle(stringCellStyle); + break; + case ROW: + Object[] fields = ((SeaTunnelRow) value).getFields(); + String[] strings = new String[fields.length]; + for (int i = 0; i < fields.length; i++) { + strings[i] = convert(fields[i], ((SeaTunnelRowType) type).getFieldType(i)); + } + cell.setCellValue(String.join(fieldDelimiter, strings)); + cell.setCellStyle(stringCellStyle); + break; + case DATE: + cell.setCellValue((LocalDate) value); + cell.setCellStyle(dateCellStyle); + break; + case TIMESTAMP: + case TIME: + setTimestampColumn(value, cell); + break; + default: + String errorMsg = String.format("[%s] type not support ", type.getSqlType()); + throw new RuntimeException(errorMsg); + + } + } + } + + private String convert(Object field, SeaTunnelDataType fieldType) { + if (field == null) { + return ""; + } + switch (fieldType.getSqlType()) { + case ARRAY: + case MAP: + return JsonUtils.toJsonString(field); + case STRING: + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case FLOAT: + case DOUBLE: + case DECIMAL: + return field.toString(); + case DATE: + return DateUtils.toString((LocalDate) field, dateFormat); + case TIME: + return TimeUtils.toString((LocalTime) field, timeFormat); + case TIMESTAMP: + return DateTimeUtils.toString((LocalDateTime) field, dateTimeFormat); + case NULL: + return ""; + case BYTES: + return new String((byte[]) field); + case ROW: + Object[] fields = ((SeaTunnelRow) field).getFields(); + String[] strings = new String[fields.length]; + for (int i = 0; i < fields.length; i++) { + strings[i] = convert(fields[i], ((SeaTunnelRowType) fieldType).getFieldType(i)); + } + return String.join(fieldDelimiter, strings); + default: + throw new UnsupportedOperationException("SeaTunnel format text not supported for parsing this type"); + } + } + + private void setTimestampColumn(Object value, Cell cell) { + if (value instanceof Timestamp) { + cell.setCellValue((Timestamp) value); + cell.setCellStyle(dateTimeCellStyle); + } else if (value instanceof LocalDate) { + cell.setCellValue((LocalDate) value); + cell.setCellStyle(dateCellStyle); + } else if (value instanceof LocalDateTime) { + cell.setCellValue(Timestamp.valueOf((LocalDateTime) value)); + cell.setCellStyle(dateTimeCellStyle); + } else if (value instanceof LocalTime) { + cell.setCellValue(Timestamp.valueOf(((LocalTime) value).atDate(LocalDate.ofEpochDay(0)))); + cell.setCellStyle(timeCellStyle); + } else { + throw new RuntimeException("Time series type expected for field"); + } + + } + + private CellStyle createStyle(Workbook wb, String format) { + CreationHelper creationHelper = wb.getCreationHelper(); + CellStyle cellStyle = wb.createCellStyle(); + cellStyle.setDataFormat(creationHelper.createDataFormat().getFormat(format)); + return cellStyle; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java new file mode 100644 index 00000000000..79ebd9e5de4 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ExcelWriteStrategy.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.file.sink.util.ExcelGenerator; +import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils; + +import lombok.NonNull; +import org.apache.hadoop.fs.FSDataOutputStream; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class ExcelWriteStrategy extends AbstractWriteStrategy { + private Map beingWrittenWriter; + + public ExcelWriteStrategy(TextFileSinkConfig textFileSinkConfig) { + super(textFileSinkConfig); + this.beingWrittenWriter = new HashMap<>(); + } + + @Override + public void write(SeaTunnelRow seaTunnelRow) throws Exception { + String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow); + ExcelGenerator excelGenerator = getOrCreateExcelGenerator(filePath); + excelGenerator.writeData(seaTunnelRow); + } + + @Override + public void finishAndCloseFile() { + this.beingWrittenWriter.forEach((k, v) -> { + try { + FileSystemUtils.createFile(k); + FSDataOutputStream fileOutputStream = FileSystemUtils.getOutputStream(k); + v.flushAndCloseExcel(fileOutputStream); + fileOutputStream.close(); + } catch (IOException e) { + log.error("can not get output file stream"); + throw new RuntimeException(e); + } + needMoveFiles.put(k, getTargetLocation(k)); + }); + } + + private ExcelGenerator getOrCreateExcelGenerator(@NonNull String filePath) { + ExcelGenerator excelGenerator = this.beingWrittenWriter.get(filePath); + if (excelGenerator == null) { + excelGenerator = new ExcelGenerator(sinkColumnsIndexInRow, seaTunnelRowType, textFileSinkConfig); + this.beingWrittenWriter.put(filePath, excelGenerator); + } + return excelGenerator; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java new file mode 100644 index 00000000000..07ed590de0a --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.source.reader; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException; + +public class ExcelReadStrategy extends AbstractReadStrategy { + @Override + public void read(String path, Collector output) throws Exception { + + } + + @Override + public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException { + return null; + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_excel.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_excel.conf new file mode 100644 index 00000000000..b38b8e52ad3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_excel.conf @@ -0,0 +1,70 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + sql { + sql = "select name,age from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + HdfsFile { + path="/tmp/hive/warehouse/test2" + row_delimiter="\n" + partition_by=["age"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="excel" + sink_columns=["name","age"] + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + save_mode="error" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_excel.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_excel.conf new file mode 100644 index 00000000000..125b9e7be91 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_excel.conf @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + sql { + sql = "select name,age from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql +} + +sink { + LocalFile { + path="/tmp/hive/warehouse/test2" + partition_by=["age"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="excel" + sink_columns=["name","age"] + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + save_mode="error" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile +} \ No newline at end of file