Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Add iceberg source connector #2615

Merged
merged 2 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,8 @@ seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connec
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java from https://github.com/apache/flink
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java from https://github.com/apache/flink
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/BiConsumerWithException.java from https://github.com/apache/flink
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java from https://github.com/apache/flink
seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/JdbcStatementBuilder.java from https://github.com/apache/flink
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergScanSplitPlanner.java from https://github.com/apache/iceberg
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/scan/IcebergStreamScanStrategy.java from https://github.com/apache/iceberg
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergEnumerationResult.java from https://github.com/apache/iceberg
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergEnumeratorPosition.java from https://github.com/apache/iceberg
19 changes: 18 additions & 1 deletion NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,21 @@ The Apache Software Foundation (http://www.apache.org/).


Flink : Connectors : JDBC
Copyright 2014-2022 The Apache Software Foundation
Copyright 2014-2022 The Apache Software Foundation



// ------------------------------------------------------------------
// NOTICE file corresponding to the section 4d of The Apache License,
// Version 2.0, in this case for Apache Iceberg
// ------------------------------------------------------------------

Apache Iceberg
Copyright 2017-2022 The Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).


Iceberg : Flink
Copyright 2017-2022 The Apache Software Foundation
157 changes: 157 additions & 0 deletions docs/en/connector-v2/source/Iceberg.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Apache Iceberg

> Apache Iceberg source connector
EricJoy2048 marked this conversation as resolved.
Show resolved Hide resolved

hailin0 marked this conversation as resolved.
Show resolved Hide resolved
## Description

Source connector for Apache Iceberg. It can support batch and stream mode.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [schema projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

- [x] data format
- [x] parquet
- [x] orc
- [x] avro
- [x] iceberg catalog
- [x] hadoop(2.7.5)
- [x] hive(2.3.9)

## Options

| name | type | required | default value |
|-----------------------------------|----------|----------|-------------------------|
| catalog_name | string | yes | - |
| catalog_type | string | yes | - |
| uri | string | false | - |
| warehouse | string | yes | - |
| namespace | string | yes | - |
| table | string | yes | - |
| case_sensitive | boolean | false | false |
| start_snapshot_timestamp | long | false | - |
| start_snapshot_id | long | false | - |
| end_snapshot_id | long | false | - |
| use_snapshot_id | long | false | - |
| use_snapshot_timestamp | long | false | - |
| stream_scan_strategy | enum | false | FROM_LATEST_SNAPSHOT |

### catalog_name [string]

User-specified catalog name.

### catalog_type [string]

The optional values are:
- hive: The hive metastore catalog.
- hadoop: The hadoop catalog.

### uri [string]

The Hive metastore’s thrift URI.

### warehouse [string]

The location to store metadata files and data files.

### namespace [string]

The iceberg database name in the backend catalog.

### table [string]

The iceberg table name in the backend catalog.

### case_sensitive [boolean]

If data columns where selected via fields(Collection), controls whether the match to the schema will be done with case sensitivity.

### fields [array]

Use projection to select data columns and columns order.

### start_snapshot_id [long]

Instructs this scan to look for changes starting from a particular snapshot (exclusive).

### start_snapshot_timestamp [long]

Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp. timestamp – the timestamp in millis since the Unix epoch

### end_snapshot_id [long]

Instructs this scan to look for changes up to a particular snapshot (inclusive).

### use_snapshot_id [long]

Instructs this scan to look for use the given snapshot ID.

### use_snapshot_timestamp [long]

Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch

### stream_scan_strategy [enum]

Starting strategy for stream mode execution, Default to use `FROM_LATEST_SNAPSHOT` if don’t specify any value.
The optional values are:
- TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.
- FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.
- FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.
- FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.
- FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive.

## Example

simple

```hocon
source {
Iceberg {
catalog_name = "seatunnel"
catalog_type = "hadoop"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
}
```
Or

```hocon
source {
Iceberg {
catalog_name = "seatunnel"
catalog_type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
}
```

schema projection

```hocon
source {
Iceberg {
catalog_name = "seatunnel"
catalog_type = "hadoop"
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"

fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
}
}
}
```
2 changes: 1 addition & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,4 @@ seatunnel.sink.DataHub = connector-datahub
seatunnel.sink.Sentry = connector-sentry
seatunnel.source.MongoDB = connector-mongodb
seatunnel.sink.MongoDB = connector-mongodb

seatunnel.source.Iceberg = connector-iceberg
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public void close() {
}

private String fieldToString(SeaTunnelDataType<?> type, Object value) {
if (value == null) {
return null;
}
switch (type.getSqlType()) {
case ARRAY:
case BYTES:
Expand Down
Loading