Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
wongxingjun authored Apr 13, 2024
2 parents 4e0f5e9 + c0d889f commit 805dc5f
Show file tree
Hide file tree
Showing 1,443 changed files with 67,400 additions and 10,509 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

## Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).

*(Please pick either of the following options)*

Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ jobs:
- release-1.19
- release-1.18
- release-1.17
- release-1.16
steps:
- uses: actions/checkout@v3
with:
Expand All @@ -43,13 +42,13 @@ jobs:
echo "flink_branch=${currentBranch}" >> ${GITHUB_ENV}
if [ "${currentBranch}" = "master" ]; then
echo "flink_alias=release-1.19" >> ${GITHUB_ENV}
elif [ "${currentBranch}" = "release-1.18" ]; then
echo "flink_alias=release-1.20" >> ${GITHUB_ENV}
elif [ "${currentBranch}" = "release-1.19" ]; then
echo "flink_alias=stable" >> ${GITHUB_ENV}
fi
- name: Build documentation
run: |
docker run --rm --volume "$PWD:/root/flink" mapohl/flink-ci:FLINK-34194 bash -c "cd /root/flink && ./.github/workflows/docs.sh"
docker run --rm --volume "$PWD:/root/flink" chesnay/flink-ci:java_8_11_17_21_maven_386 bash -c "cd /root/flink && ./.github/workflows/docs.sh"
- name: Upload documentation
uses: burnett01/[email protected]
with:
Expand Down
8 changes: 4 additions & 4 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ resources:
# Container with SSL to have the same environment everywhere.
# see https://github.com/apache/flink-connector-shared-utils/tree/ci_utils
- container: flink-build-container
image: mapohl/flink-ci:FLINK-34194
image: chesnay/flink-ci:java_8_11_17_21_maven_386
# On AZP provided machines, set this flag to allow writing coredumps in docker
options: --privileged

Expand Down Expand Up @@ -73,16 +73,16 @@ stages:
parameters: # see template file for a definition of the parameters.
stage_name: ci_build
test_pool_definition:
vmImage: 'ubuntu-22.04'
vmImage: 'ubuntu-20.04'
e2e_pool_definition:
vmImage: 'ubuntu-22.04'
vmImage: 'ubuntu-20.04'
environment: PROFILE="-Dflink.hadoop.version=2.10.2"
run_end_to_end: false
container: flink-build-container
jdk: 8
- job: docs_404_check # run on a MSFT provided machine
pool:
vmImage: 'ubuntu-22.04'
vmImage: 'ubuntu-20.04'
steps:
- task: GoTool@0
inputs:
Expand Down
1 change: 1 addition & 0 deletions docs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pygmentsUseClasses = true
]

PreviousDocs = [
["1.19", "http://nightlies.apache.org/flink/flink-docs-release-1.19"],
["1.18", "http://nightlies.apache.org/flink/flink-docs-release-1.18"],
["1.17", "http://nightlies.apache.org/flink/flink-docs-release-1.17"],
["1.16", "http://nightlies.apache.org/flink/flink-docs-release-1.16"],
Expand Down
9 changes: 9 additions & 0 deletions docs/content.zh/docs/connectors/table/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ Format 参数
<td>String</td>
<td>仅用于 <a href="{{< ref "docs/connectors/table/filesystem" >}}">filesystem</a>,avro 压缩编解码器。默认 snappy 压缩。目前支持:null, deflate、snappy、bzip2、xz。</td>
</tr>
<tr>
<td><h5>timestamp_mapping.legacy</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>使用 avro 中时间戳的旧映射。 在 1.19 之前,Flink 的默认行为错误地将 SQL TIMESTAMP 和 TIMESTAMP_LTZ 类型映射到 AVRO TIMESTAMP。<br>
正确的行为是 Flink SQL TIMESTAMP 映射 Avro LOCAL TIMESTAMP 和 Flink SQL TIMESTAMP_LTZ 映射 Avro TIMESTAMP,您可以通过禁用此旧映射来获得正确的映射。<br>
出于兼容性考虑,默认使用旧行为。</td>
</tr>
</tbody>
</table>

Expand Down
38 changes: 33 additions & 5 deletions docs/content.zh/docs/connectors/table/formats/debezium.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Deb
依赖
------------

#### Debezium Avro
#### Debezium Confluent Avro

{{< sql_download_table "debezium-avro-confluent" >}}

Expand Down Expand Up @@ -84,7 +84,9 @@ Debezium 为变更日志提供了统一的格式,这是一个 JSON 格式的

*注意: 请参考 [Debezium 文档](https://debezium.io/documentation/reference/1.3/connectors/mysql.html#mysql-connector-events_debezium),了解每个字段的含义。*

MySQL 产品表有4列(`id``name``description``weight`)。上面的 JSON 消息是 `products` 表上的一条更新事件,其中 `id = 111` 的行的 `weight` 值从 `5.18` 更改为 `5.15`。假设此消息已同步到 Kafka 主题 `products_binlog`,则可以使用以下 DDL 来使用此主题并解析更改事件。
MySQL 产品表有4列(`id``name``description``weight`)。上面的 JSON 消息是 `products` 表上的一条更新事件,其中 `id = 111` 的行的 `weight` 值从 `5.18` 更改为 `5.15`。假设此消息已同步到 Kafka 主题 `products_binlog`,则可以使用以下 DDL(用于 Debezium JSON 和 Debezium Confluent Avro)来消费此主题并解析更改事件。

#### Debezium JSON DDL

{{< tabs "0b6703c1-021e-4506-a579-b72b8408c0cf" >}}
{{< tab "SQL" >}}
Expand All @@ -101,8 +103,7 @@ CREATE TABLE topic_products (
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
-- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息
-- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'
'format' = 'debezium-json' -- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'
'format' = 'debezium-json'
)
```
{{< /tab >}}
Expand Down Expand Up @@ -136,7 +137,34 @@ CREATE TABLE topic_products (

为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项 `'debezium-json.schema-include' = 'true'`(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。

在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源。
#### Debezium Confluent Avro DDL

{{< tabs "b3832d51-f57b-4c1f-87aa-5356fba23047" >}}
{{< tab "SQL" >}}
```sql
CREATE TABLE topic_products (
-- schema 与 MySQL 的 products 表完全相同
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
-- 使用 'debezium-avro-confluent' format 来解析 Debezium 的 Avro 消息
'format' = 'debezium-avro-confluent',
-- Kafka Schema Registry 的 URL
'debezium-avro-confluent.url' = 'http://localhost:8081'
)
```
{{< /tab >}}
{{< /tabs >}}

#### 结果产出

对于每种数据格式,在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源。

{{< tabs "6a84a0e8-2e56-49db-9089-e836290f8239" >}}
{{< tab "SQL" >}}
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ These options are for the network stack that handles the streaming and batch dat
Flink uses Pekko for RPC between components (JobManager/TaskManager/ResourceManager).
Flink does not use Pekko for data transport.

{{< generated/akka_configuration >}}
{{< generated/rpc_configuration >}}

----
----
Expand Down
8 changes: 4 additions & 4 deletions docs/content.zh/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ We recommend setting all configuration options in DataStream API early before sw
{{< tab "Java" >}}
```java
import java.time.ZoneId;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.core.execution.CheckpointingMode.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

Expand All @@ -537,7 +537,7 @@ env.setMaxParallelism(256);

env.getConfig().addDefaultKryoSerializer(MyCustomType.class, CustomKryoSerializer.class);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);

// then switch to Java Table API

Expand All @@ -553,9 +553,9 @@ tableEnv.getConfig().setLocalTimeZone(ZoneId.of("Europe/Berlin"));
{{< tab "Scala" >}}
```scala
import java.time.ZoneId
import org.apache.flink.core.execution.CheckpointingMode
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.table.api.bridge.scala._

// create Scala DataStream API
Expand All @@ -568,7 +568,7 @@ env.setMaxParallelism(256)

env.getConfig.addDefaultKryoSerializer(classOf[MyCustomType], classOf[CustomKryoSerializer])

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE)

// then switch to Scala Table API

Expand Down
89 changes: 89 additions & 0 deletions docs/content.zh/docs/dev/table/sql/show.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ SHOW CREATE 语句用于打印给定对象的创建 DDL 语句。当前的 SHOW
目前 Flink SQL 支持下列 SHOW 语句:
- SHOW CATALOGS
- SHOW CURRENT CATALOG
- SHOW CREATE CATALOG
- SHOW DATABASES
- SHOW CURRENT DATABASE
- SHOW TABLES
Expand Down Expand Up @@ -102,6 +103,22 @@ tEnv.executeSql("SHOW CURRENT CATALOG").print();
// | default_catalog |
// +----------------------+

// create a catalog
tEnv.executeSql("CREATE CATALOG cat2 WITH (...)");

// show create catalog
tEnv.executeSql("SHOW CREATE CATALOG cat2").print();
// +---------------------------------------------------------------------------------------------+
// | result |
// +---------------------------------------------------------------------------------------------+
// | CREATE CATALOG `cat2` WITH (
// 'default-database' = 'db',
// 'type' = 'generic_in_memory'
// )
// |
// +---------------------------------------------------------------------------------------------+
// 1 row in set

// show databases
tEnv.executeSql("SHOW DATABASES").print();
// +------------------+
Expand Down Expand Up @@ -214,6 +231,22 @@ tEnv.executeSql("SHOW CATALOGS").print()
// | default_catalog |
// +-----------------+

// create a catalog
tEnv.executeSql("CREATE CATALOG cat2 WITH (...)")

// show create catalog
tEnv.executeSql("SHOW CREATE CATALOG cat2").print()
// +---------------------------------------------------------------------------------------------+
// | result |
// +---------------------------------------------------------------------------------------------+
// | CREATE CATALOG `cat2` WITH (
// 'default-database' = 'db',
// 'type' = 'generic_in_memory'
// )
// |
// +---------------------------------------------------------------------------------------------+
// 1 row in set

// show databases
tEnv.executeSql("SHOW DATABASES").print()
// +------------------+
Expand Down Expand Up @@ -316,6 +349,22 @@ table_env.execute_sql("SHOW CATALOGS").print()
# | default_catalog |
# +-----------------+

# create a catalog
table_env.execute_sql("CREATE CATALOG cat2 WITH (...)")

# show create catalog
table_env.execute_sql("SHOW CREATE CATALOG cat2").print()
# +---------------------------------------------------------------------------------------------+
# | result |
# +---------------------------------------------------------------------------------------------+
# | CREATE CATALOG `cat2` WITH (
# 'default-database' = 'db',
# 'type' = 'generic_in_memory'
# )
# |
# +---------------------------------------------------------------------------------------------+
# 1 row in set

# show databases
table_env.execute_sql("SHOW DATABASES").print()
# +------------------+
Expand Down Expand Up @@ -411,6 +460,14 @@ table_env.execute_sql("SHOW FULL MODULES").print()
Flink SQL> SHOW CATALOGS;
default_catalog

Flink SQL> CREATE CATALOG cat2 WITH (...);
[INFO] Execute statement succeeded.

Flink SQL> SHOW CREATE CATALOG cat2;
CREATE CATALOG `cat2` WITH (
...
)

Flink SQL> SHOW DATABASES;
default_database

Expand Down Expand Up @@ -504,6 +561,38 @@ SHOW CURRENT CATALOG

显示当前正在使用的 catalog。

## SHOW CREATE CATALOG

```sql
SHOW CREATE CATALOG catalog_name
```

展示一个现有 catalog 的创建语句。

该语句的输出内容包括 catalog 的名称和相关属性,使您可以直观地了解相应 catalog 的元数据。

假设 `cat2` 是按如下方式创建的:
```sql
create catalog cat2 WITH (
'type'='generic_in_memory',
'default-database'='db'
);
```
展示 catalog 创建语句。
```sql
show create catalog cat2;
+---------------------------------------------------------------------------------------------+
| result |
+---------------------------------------------------------------------------------------------+
| CREATE CATALOG `cat2` WITH (
'default-database' = 'db',
'type' = 'generic_in_memory'
)
|
+---------------------------------------------------------------------------------------------+
1 row in set
```

## SHOW DATABASES

```sql
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/flinkDev/ide_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ IntelliJ 使用 Checkstyle-IDEA 插件在 IDE 中支持 checkstyle。
1. 从 IntelliJ 插件存储库中安装 "Checkstyle-IDEA" 插件。
2. 通过 Settings → Tools → Checkstyle 配置插件。
3. 将 "Scan Scope" 设置为仅 Java 源(包括测试)。
4. 在 "Checkstyle Version" 下拉菜单中选择 _8.14_ 版本,然后单击 "apply"。**此步骤很重要,请勿跳过!**
4. 在 "Checkstyle Version" 下拉菜单中选择 _9.3_ 版本,然后单击 "apply"。**此步骤很重要,请勿跳过!**
5. 在 "Configuration File" 窗格中,点击 "+" 图标添加新配置:
1. 将 "Description" 设置为 Flink。
2. 选择 "Use a local Checkstyle file" ,然后将其指向你存储库中 `"tools/maven/checkstyle.xml"` 文件。
Expand Down
12 changes: 11 additions & 1 deletion docs/content.zh/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,7 @@ Note that the metrics are only available via reporters.
<td>Histogram</td>
</tr>
<tr>
<th rowspan="24"><strong>Task</strong></th>
<th rowspan="27"><strong>Task</strong></th>
<td>numBytesInLocal</td>
<td><span class="label label-danger">Attention:</span> deprecated, use <a href="{{< ref "docs/ops/metrics" >}}#default-shuffle-service">Default shuffle service metrics</a>.</td>
<td>Counter</td>
Expand Down Expand Up @@ -1692,6 +1692,16 @@ Note that the metrics are only available via reporters.
<td>The number of network buffers this task emits per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>numFiredTimers</td>
<td>The total number of timers this task has fired.</td>
<td>Counter</td>
</tr>
<tr>
<td>numFiredTimersPerSecond</td>
<td>The number of timers this task fires per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>isBackPressured</td>
<td>Whether the task is back-pressured.</td>
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/connectors/table/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ Format Options
<td>String</td>
<td>For <a href="{{< ref "docs/connectors/table/filesystem" >}}">Filesystem</a> only, the compression codec for avro. Snappy compression as default. The valid enumerations are: null, deflate, snappy, bzip2, xz.</td>
</tr>
<tr>
<td><h5>timestamp_mapping.legacy</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Use the legacy mapping of timestamp in avro. Before 1.19, The default behavior of Flink wrongly mapped both SQL TIMESTAMP and TIMESTAMP_LTZ type to AVRO TIMESTAMP. <br>
The correct behavior is Flink SQL TIMESTAMP maps Avro LOCAL TIMESTAMP and Flink SQL TIMESTAMP_LTZ maps Avro TIMESTAMP, you can obtain the correct mapping by disable using this legacy mapping. <br>
Use legacy behavior by default for compatibility consideration. </td>
</tr>
</tbody>
</table>

Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ These options are for the network stack that handles the streaming and batch dat
Flink uses Pekko for RPC between components (JobManager/TaskManager/ResourceManager).
Flink does not use Pekko for data transport.

{{< generated/akka_configuration >}}
{{< generated/rpc_configuration >}}

----
----
Expand Down
Loading

0 comments on commit 805dc5f

Please sign in to comment.