Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[update] Update query docs of 3.0/dev, fix typo and issues #1271

Merged
merged 39 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
9a405e5
1
KassieZ Nov 2, 2024
fc3d483
2
KassieZ Nov 2, 2024
a3a099a
2
KassieZ Nov 3, 2024
8549f7b
deadlink
KassieZ Nov 5, 2024
7e4d4d4
4
KassieZ Nov 5, 2024
b280558
5
KassieZ Nov 5, 2024
1c88e60
Merge branch 'master' into update-version3.0/dev-query
KassieZ Nov 5, 2024
7558d9b
6-typo
KassieZ Nov 5, 2024
f6ddc4d
7 issue
KassieZ Nov 5, 2024
29ec984
5-deadlink
KassieZ Nov 5, 2024
082925c
5-deadlink
KassieZ Nov 5, 2024
1044c18
Merge branch 'update-version3.0/dev-query' of https://github.com/Kass…
KassieZ Nov 5, 2024
6b575b9
[fix] rewrite tiered storage for hdd and ssd (#1282)
dataroaring Nov 5, 2024
f6c8c4f
8delete files
KassieZ Nov 5, 2024
048f9ea
[fix] remove useless docs (#1279)
dataroaring Nov 5, 2024
33f62d1
[doc](typo) fix some typo in `update` section (#1280)
yagagagaga Nov 5, 2024
14fbf7c
[doc](typo) fix some typo in `remote-storage` section (#1278)
yagagagaga Nov 5, 2024
72862c7
[doc](unique key) fix the description of default settings of MoW (#1288)
zhannngchen Nov 5, 2024
db328ff
Update standard-deployment.md (#1232)
wangtianyi2004 Nov 5, 2024
1b30789
[doc](cooldown)add enable_cooldown_replica_affinity config (#1245)
cjj2010 Nov 5, 2024
4f5cd14
doc: remove redundant indentation among four blog files about transac…
Jake-00 Nov 5, 2024
b742955
[doc](typo) fix some typo in `delete` section (#1274)
yagagagaga Nov 5, 2024
f517b46
[doc] fix dead link (#1281)
intelligentfu8 Nov 5, 2024
4681671
[doc](routine load) fix and adjust routine load document (#1286)
sollhui Nov 5, 2024
88a0df4
[fix] fix tiered-storage hdd-ssd (#1287)
dataroaring Nov 5, 2024
0a86f07
[web](topbanner) Update top banner info (#1289)
KassieZ Nov 5, 2024
87f39fb
[doc](ecosystem) improve spark connector doc (#1292)
JNSimba Nov 6, 2024
9bdf006
1
KassieZ Nov 2, 2024
80fe38b
2
KassieZ Nov 2, 2024
45b0bab
2
KassieZ Nov 3, 2024
ca0a7a8
deadlink
KassieZ Nov 5, 2024
3827c47
4
KassieZ Nov 5, 2024
7decf97
5
KassieZ Nov 5, 2024
277eecf
6-typo
KassieZ Nov 5, 2024
d857b52
7 issue
KassieZ Nov 5, 2024
f0ce120
5-deadlink
KassieZ Nov 5, 2024
f989bab
5-deadlink
KassieZ Nov 5, 2024
6a7f99c
8delete files
KassieZ Nov 5, 2024
b6d5fbb
Merge branch 'update-version3.0/dev-query' of https://github.com/Kass…
KassieZ Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion common_docs_zh/ecosystem/hive-hll-udf.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ under the License.

# Hive HLL UDF

Hive HLL UDF 提供了在 hive 表中生成 HLL 运算等 UDF,Hive 中的 HLL 与 Doris HLL 完全一致,Hive 中的 HLL 可以通过 Spark HLL Load 导入 Doris。关于 HLL 更多介绍可以参考:[使用 HLL 近似去重](../query/duplicate/using-hll.md)
Hive HLL UDF 提供了在 hive 表中生成 HLL 运算等 UDF,Hive 中的 HLL 与 Doris HLL 完全一致,Hive 中的 HLL 可以通过 Spark HLL Load 导入 Doris。关于 HLL 更多介绍可以参考:[使用 HLL 近似去重](https://doris.apache.org/zh-CN/docs/query-acceleration/distinct-counts/using-hll/)

函数简介:
1. UDAF
Expand Down
132 changes: 67 additions & 65 deletions common_docs_zh/ecosystem/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,30 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据
| 1.1.0 | 3.2, 3.1, 2.3 | 1.0 + | 8 | 2.12, 2.11 |
| 1.0.1 | 3.1, 2.3 | 0.12 - 0.15 | 8 | 2.12, 2.11 |

## 编译与安装
## 使用

准备工作
### Maven
```
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.4_2.12</artifactId>
<version>1.3.2</version>
</dependency>
```

**备注**

1. 请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。

2. 也可从[这里](https://repo.maven.apache.org/maven2/org/apache/doris/)下载相关版本 jar 包。

### 编译

编译时,可直接运行 `sh build.sh`,具体可参考这里。

编译成功后,会在 `dist` 目录生成目标 jar 包,如:spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar。 将此文件复制到 `Spark` 的 `ClassPath` 中即可使用 `Spark-Doris-Connector`。 例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。
也可以

1. 修改`custom_env.sh.tpl`文件,重命名为`custom_env.sh`

2. 在源码目录下执行:
`sh build.sh`
Expand All @@ -59,36 +78,18 @@ Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据

例如,`Local` 模式运行的 `Spark`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Spark`,则将此文件放入预部署包中。

例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar
包路径

例如将 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 上传到 hdfs 并在 `spark.yarn.jars` 参数上添加 hdfs 上的 Jar包路径
```shell
1. 上传 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 到 hdfs。

```
hdfs dfs -mkdir /spark-jars/
hdfs dfs -put /your_local_path/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar /spark-jars/
```

2. 在集群中添加 `spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar` 依赖。

```
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.2.0-SNAPSHOT.jar
```

## 使用 Maven 管理

```
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.4_2.12</artifactId>
<version>1.3.0</version>
</dependency>
```

**注意**

请根据不同的 Spark 和 Scala 版本替换相应的 Connector 版本。

## 使用示例

### 读取
Expand All @@ -112,7 +113,7 @@ FROM spark_doris;

#### DataFrame

```scala
```java
val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
Expand All @@ -125,7 +126,7 @@ dorisSparkDF.show(5)

#### RDD

```scala
```java
import org.apache.doris.spark._

val dorisSparkRDD = sc.dorisRDD(
Expand All @@ -142,7 +143,7 @@ dorisSparkRDD.collect()

#### pySpark

```scala
```java
dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
Expand Down Expand Up @@ -182,8 +183,8 @@ FROM YOUR_TABLE

#### DataFrame(batch/stream)

```scala
## batch sink
```java
// batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
Expand All @@ -203,9 +204,9 @@ mockDataDF.write.format("doris")
// .option("save_mode", SaveMode.Overwrite)
.save()

## stream sink(StructuredStreaming)
// stream sink(StructuredStreaming)

### 结果 DataFrame 和 doris 表相同的结构化数据, 配置方式和批量模式一致。
// 结果 DataFrame 和 doris 表相同的结构化数据, 配置方式和批量模式一致。
val sourceDf = spark.readStream.
.format("your_own_stream_source")
.load()
Expand All @@ -222,7 +223,7 @@ resultDf.writeStream
.start()
.awaitTermination()

### 结果 DataFrame 中存在某一列的数据可以直接写入的,比如符合导入规范的 Kafka 消息中的 value 值
// 结果 DataFrame 中存在某一列的数据可以直接写入的,比如符合导入规范的 Kafka 消息中的 value 值

val kafkaSource = spark.readStream
.format("kafka")
Expand Down Expand Up @@ -265,14 +266,14 @@ kafkaSource.selectExpr("CAST(value as STRING)")
| doris.request.query.timeout.s | 3600 | 查询 doris 的超时时间,默认值为 1 小时,-1 表示无超时限制 |
| doris.request.tablet.size | Integer.MAX_VALUE | 一个 RDD Partition 对应的 Doris Tablet 个数。<br />此数值设置越小,则会生成越多的 Partition。从而提升 Spark 侧的并行度,但同时会对 Doris 造成更大的压力。 |
| doris.read.field | -- | 读取 Doris 表的列名列表,多列之间使用逗号分隔 |
| doris.batch.size | 1024 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。<br />从而减轻网络延迟所带来的额外时间开销。 |
| doris.batch.size | 4064 | 一次从 BE 读取数据的最大行数。增大此数值可减少 Spark 与 Doris 之间建立连接的次数。<br />从而减轻网络延迟所带来的额外时间开销。 |
| doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
| doris.deserialize.arrow.async | false | 是否支持异步转换 Arrow 格式到 spark-doris-connector 迭代所需的 RowBatch |
| doris.deserialize.queue.size | 64 | 异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效 |
| doris.write.fields | -- | 指定写入 Doris 表的字段或者字段顺序,多列之间使用逗号分隔。<br />默认写入时要按照 Doris 表字段顺序写入全部字段。 |
| doris.sink.batch.size | 100000 | 单次写 BE 的最大行数 |
| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数 |
| doris.sink.properties.format | csv | Stream Load 的数据格式。<br/>共支持 3 种格式:csv,json,arrow(1.4.0 版本开始支持)<br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) |
| doris.sink.max-retries | 0 | 写 BE 失败之后的重试次数,从 1.3.0 版本开始, 默认值为 0,即默认不进行重试。当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。 |
| doris.sink.properties.format | csv | Stream Load 的数据格式。<br/>共支持 3 种格式:csv,json,arrow <br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) |
| doris.sink.properties.* | -- | Stream Load 的导入参数。<br/>例如:<br/>指定列分隔符:`'doris.sink.properties.column_separator' = ','`等<br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/data-operate/import/stream-load-manual/) |
| doris.sink.task.partition.size | -- | Doris 写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。<br/>此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 |
| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris 写入 Partition 数。默认值为 false,采用 coalesce 方式控制(注意:如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。<br/>如果设置为 true,则采用 repartition 方式(注意:可设置最后 Partition 数,但会额外增加 shuffle 开销)。 |
Expand Down Expand Up @@ -303,9 +304,36 @@ kafkaSource.selectExpr("CAST(value as STRING)")
| doris.request.auth.password | -- | 访问 Doris 的密码 |
| doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。 |

:::tip

1. 在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP` 或 `HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型,并通过 `doris.write.fields` 对列进行映射转换,使用方式如下:
## Doris 和 Spark 列类型映射关系

| Doris Type | Spark Type |
|------------|----------------------------------|
| NULL_TYPE | DataTypes.NullType |
| BOOLEAN | DataTypes.BooleanType |
| TINYINT | DataTypes.ByteType |
| SMALLINT | DataTypes.ShortType |
| INT | DataTypes.IntegerType |
| BIGINT | DataTypes.LongType |
| FLOAT | DataTypes.FloatType |
| DOUBLE | DataTypes.DoubleType |
| DATE | DataTypes.DateType |
| DATETIME | DataTypes.StringType<sup>1</sup> |
| DECIMAL | DecimalType |
| CHAR | DataTypes.StringType |
| LARGEINT | DecimalType |
| VARCHAR | DataTypes.StringType |
| TIME | DataTypes.DoubleType |
| HLL | Unsupported datatype |
| Bitmap | Unsupported datatype |

* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。


## 常见问题
1. **如何写入 Bitmap 类型?**

在 Spark SQL 中,通过 insert into 方式写入数据时,如果 doris 的目标表中包含 `BITMAP` 或 `HLL` 类型的数据时,需要设置参数 `doris.ignore-type` 为对应类型,并通过 `doris.write.fields` 对列进行映射转换,使用方式如下:
> BITMAP
> ```sql
> CREATE TEMPORARY VIEW spark_doris
Expand Down Expand Up @@ -333,13 +361,11 @@ kafkaSource.selectExpr("CAST(value as STRING)")
> );
> ```

2. **如何使用overwrite写入?**

2. 从 1.3.0 版本开始, `doris.sink.max-retries` 配置项的默认值为 0,即默认不进行重试。
当设置该参数大于 0 时,会进行批次级别的失败重试,会在 Spark Executor 内存中缓存 `doris.sink.batch.size` 所配置大小的数据,可能需要适当增大内存分配。

3. 从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下
从 1.3.0 版本开始,支持 overwrite 模式写入(只支持全表级别的数据覆盖),具体使用方式如下
> DataFrame
> ```scala
> ```java
> resultDf.format("doris")
> .option("doris.fenodes","$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
> // your own options
Expand All @@ -353,28 +379,4 @@ kafkaSource.selectExpr("CAST(value as STRING)")
> SELECT * FROM your_source_table
> ```

:::

## Doris 和 Spark 列类型映射关系

| Doris Type | Spark Type |
|------------|----------------------------------|
| NULL_TYPE | DataTypes.NullType |
| BOOLEAN | DataTypes.BooleanType |
| TINYINT | DataTypes.ByteType |
| SMALLINT | DataTypes.ShortType |
| INT | DataTypes.IntegerType |
| BIGINT | DataTypes.LongType |
| FLOAT | DataTypes.FloatType |
| DOUBLE | DataTypes.DoubleType |
| DATE | DataTypes.DateType |
| DATETIME | DataTypes.StringType<sup>1</sup> |
| DECIMAL | DecimalType |
| CHAR | DataTypes.StringType |
| LARGEINT | DecimalType |
| VARCHAR | DataTypes.StringType |
| TIME | DataTypes.DoubleType |
| HLL | Unsupported datatype |
| Bitmap | Unsupported datatype |

* 注:Connector 中,将`DATETIME`映射为`String`。由于`Doris`底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用 `String` 类型直接返回对应的时间可读文本。
Loading
Loading