Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][ConnectorV2]add file excel sink #2585

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
049652d
add file excel sink
Aug 31, 2022
e525451
Merge branch 'apache:dev' into dev
Bingz2 Sep 1, 2022
89e2b4f
add flink e2e conf,support all datatype
Sep 1, 2022
0a5fd71
modify sheet to member variable
Sep 2, 2022
b64e547
Merge branch 'apache:dev' into dev
Bingz2 Sep 4, 2022
1265c23
fix flink e2e
Sep 4, 2022
a595be9
move excel dependency to file-connector
Sep 6, 2022
182f009
Handling code conflicts
Sep 7, 2022
4ad6160
Handling code conflicts
Sep 7, 2022
c218193
Handling code conflicts
Sep 7, 2022
c5470d4
Merge branch 'apache:dev' into dev
Bingz2 Sep 7, 2022
4973b36
Merge branch 'apache:dev' into dev
Bingz2 Sep 7, 2022
e6178bb
Merge branch 'apache:dev' into dev
Bingz2 Sep 9, 2022
0fe3dfe
Merge branch 'apache:dev' into dev
Bingz2 Sep 13, 2022
93cbbb0
Handling code conflicts
Bingz2 Sep 18, 2022
a9e57f1
Merge branch 'apache:dev' into dev
Bingz2 Sep 18, 2022
2513f4f
Handling code conflicts
Bingz2 Sep 24, 2022
144bb27
Merge branch 'apache:dev' into dev
Bingz2 Sep 24, 2022
a699681
use SXSSFWorkbook
Bingz2 Sep 24, 2022
2fb89b7
Resolving code conflicts
Bingz2 Oct 15, 2022
d3359f7
Resolving code conflicts
Bingz2 Oct 15, 2022
f025b89
Merge branch 'apache:dev' into dev
Bingz2 Oct 15, 2022
9a77d7e
Support for more data types
Bingz2 Oct 15, 2022
06edf3b
Support for more data types
Bingz2 Oct 16, 2022
6eee00e
Merge branch 'dev' into dev
EricJoy2048 Oct 25, 2022
162ba2f
Merge branch 'apache:dev' into dev
Bingz2 Oct 29, 2022
7605250
Merge branch 'apache:dev' into dev
Bingz2 Oct 30, 2022
971bc78
Resolving code conflicts
Nov 2, 2022
00134e4
Merge branch 'apache:dev' into dev
Bingz2 Nov 2, 2022
874af27
Resolving code conflicts
Nov 11, 2022
4f8d572
Merge branch 'apache:dev' into dev
Bingz2 Nov 11, 2022
5754104
Resolving code conflicts
Nov 18, 2022
78f8b2d
Merge branch 'apache:dev' into dev
Bingz2 Nov 18, 2022
9870a07
Merge branch 'apache:dev' into dev
Bingz2 Nov 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ By default, we use 2PC commit to ensure `exactly-once`
- [x] parquet
- [x] orc
- [x] json
- [x] excel

## Options

Expand Down Expand Up @@ -59,7 +60,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran

We supported as the following file types:

`text` `csv` `parquet` `orc` `json`
`text` `csv` `parquet` `orc` `json` `excel`

Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.

Expand All @@ -86,6 +87,10 @@ The separator between columns in a row of data. Only needed by `text` and `csv`

The separator between rows in a file. Only needed by `text` and `csv` file format.

### max_rows_in_memory [int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add max_rows_in_memory to options

image


When File Format is Excel,The maximum number of data items that can be cached in the memory.Note that you need to install fonts when using openjdk.

### partition_by [array]

Partition data based on selected fields
Expand Down Expand Up @@ -180,6 +185,26 @@ HdfsFile {

```

For excel file format

```bash

HdfsFile {
fs.defaultFS="hdfs://hadoopcluster"
path="/tmp/hive/warehouse/test2"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="excel"
max_rows_in_memory=10000
sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
}

```

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
10 changes: 5 additions & 5 deletions docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ By default, we use 2PC commit to ensure `exactly-once`

- [ ] [schema projection](../../concept/connector-v2-features.md)
- [x] file format
- [x] text
- [x] csv
- [x] parquet
- [x] orc
- [x] json
- [x] text
- [x] csv
- [x] parquet
- [x] orc
- [x] json

## Options

Expand Down
13 changes: 13 additions & 0 deletions seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
<commons.lang3.version>3.4</commons.lang3.version>
<flink.hadoop.version>2.7.5-7.0</flink.hadoop.version>
<parquet-avro.version>1.12.3</parquet-avro.version>
<poi.version>4.1.2</poi.version>
<poi-ooxml.version>4.1.2</poi-ooxml.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -144,6 +146,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>

<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi-ooxml.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ public class Constant {
public static final String SINK_COLUMNS = "sink_columns";
public static final String FILENAME_TIME_FORMAT = "filename_time_format";
public static final String IS_ENABLE_TRANSACTION = "is_enable_transaction";
public static final String MAX_ROWS_IN_MEMORY = "max_rows_in_memory";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;

import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ExcelWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
Expand Down Expand Up @@ -87,6 +89,17 @@ public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
public ReadStrategy getReadStrategy() {
return new JsonReadStrategy();
}
},
EXCEL("xlsx") {
@Override
public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
return new ExcelWriteStrategy(textFileSinkConfig);
}

@Override
public ReadStrategy getReadStrategy() {
return new ExcelReadStrategy();
}
};

private final String suffix;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class TextFileSinkConfig extends BaseTextFileConfig implements PartitionC

private List<Integer> partitionFieldsIndexInRow;

private int maxRowsInMemory;

public TextFileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaTunnelRowTypeInfo) {
super(config);
checkArgument(!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
Expand Down Expand Up @@ -143,5 +145,9 @@ public TextFileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType seaT
.map(columnsMap::get)
.collect(Collectors.toList());
}

if (config.hasPath(Constant.MAX_ROWS_IN_MEMORY)) {
this.maxRowsInMemory = config.getInt(Constant.MAX_ROWS_IN_MEMORY);
}
}
}
Loading