Skip to content

Commit

Permalink
[fix](fe-meta) fix paimon file catalog meta issue and replication num…
Browse files Browse the repository at this point in the history
… analysis issue (#24683)

Cherry-pick #24681
  • Loading branch information
morningman authored and xiaokang committed Sep 23, 2023
1 parent 77e453d commit 4ece7aa
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 63 deletions.
40 changes: 21 additions & 19 deletions docs/en/docs/lakehouse/multi-catalog/paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ Paimon Catalog Currently supports two types of Metastore creation catalogs:

### Creating a Catalog Based on FileSystem

> For versions 2.0.1 and earlier, please use the following `Create Catalog based on Hive Metastore`.
#### HDFS
```sql
CREATE CATALOG `paimon_hdfs` PROPERTIES (
"type" = "paimon",
"warehouse" = "hdfs://HDFS8000871/user/paimon",
"dfs.nameservices"="HDFS8000871",
"dfs.ha.namenodes.HDFS8000871"="nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007",
"dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007",
"dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username"="hadoop"
"dfs.nameservices" = "HDFS8000871",
"dfs.ha.namenodes.HDFS8000871" = "nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1" = "172.21.0.1:4007",
"dfs.namenode.rpc-address.HDFS8000871.nn2" = "172.21.0.2:4007",
"dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username" = "hadoop"
);

```
Expand All @@ -69,9 +71,9 @@ CREATE CATALOG `paimon_hdfs` PROPERTIES (
CREATE CATALOG `paimon_s3` PROPERTIES (
"type" = "paimon",
"warehouse" = "s3://paimon-1308700295.cos.ap-beijing.myqcloud.com/paimoncos",
"s3.endpoint"="cos.ap-beijing.myqcloud.com",
"s3.access_key"="ak",
"s3.secret_key"="sk"
"s3.endpoint" = "cos.ap-beijing.myqcloud.com",
"s3.access_key" = "ak",
"s3.secret_key" = "sk"
);

```
Expand All @@ -88,9 +90,9 @@ CREATE CATALOG `paimon_s3` PROPERTIES (
CREATE CATALOG `paimon_oss` PROPERTIES (
"type" = "paimon",
"warehouse" = "oss://paimon-zd/paimonoss",
"oss.endpoint"="oss-cn-beijing.aliyuncs.com",
"oss.access_key"="ak",
"oss.secret_key"="sk"
"oss.endpoint" = "oss-cn-beijing.aliyuncs.com",
"oss.access_key" = "ak",
"oss.secret_key" = "sk"
);

```
Expand All @@ -100,15 +102,15 @@ CREATE CATALOG `paimon_oss` PROPERTIES (
```sql
CREATE CATALOG `paimon_hms` PROPERTIES (
"type" = "paimon",
"paimon.catalog.type"="hms",
"paimon.catalog.type" = "hms",
"warehouse" = "hdfs://HDFS8000871/user/zhangdong/paimon2",
"hive.metastore.uris" = "thrift://172.21.0.44:7004",
"dfs.nameservices'='HDFS8000871",
"dfs.ha.namenodes.HDFS8000871'='nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007",
"dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007",
"dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username"="hadoop"
"dfs.nameservices" = "HDFS8000871",
"dfs.ha.namenodes.HDFS8000871" = "nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1" = "172.21.0.1:4007",
"dfs.namenode.rpc-address.HDFS8000871.nn2" = "172.21.0.2:4007",
"dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username" = "hadoop"
);

```
Expand Down
40 changes: 21 additions & 19 deletions docs/zh-CN/docs/lakehouse/multi-catalog/paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,19 @@ Paimon Catalog 当前支持两种类型的Metastore创建Catalog:

### 基于FileSystem创建Catalog

> 2.0.1 及之前版本,请使用后面的 `基于Hive Metastore创建Catalog`
#### HDFS
```sql
CREATE CATALOG `paimon_hdfs` PROPERTIES (
"type" = "paimon",
"warehouse" = "hdfs://HDFS8000871/user/paimon",
"dfs.nameservices"="HDFS8000871",
"dfs.ha.namenodes.HDFS8000871"="nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007",
"dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007",
"dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username"="hadoop"
"dfs.nameservices" = "HDFS8000871",
"dfs.ha.namenodes.HDFS8000871" = "nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1" = "172.21.0.1:4007",
"dfs.namenode.rpc-address.HDFS8000871.nn2" = "172.21.0.2:4007",
"dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username" = "hadoop"
);

```
Expand All @@ -69,9 +71,9 @@ CREATE CATALOG `paimon_hdfs` PROPERTIES (
CREATE CATALOG `paimon_s3` PROPERTIES (
"type" = "paimon",
"warehouse" = "s3://paimon-1308700295.cos.ap-beijing.myqcloud.com/paimoncos",
"s3.endpoint"="cos.ap-beijing.myqcloud.com",
"s3.access_key"="ak",
"s3.secret_key"="sk"
"s3.endpoint" = "cos.ap-beijing.myqcloud.com",
"s3.access_key" = "ak",
"s3.secret_key" = "sk"
);

```
Expand All @@ -87,9 +89,9 @@ CREATE CATALOG `paimon_s3` PROPERTIES (
CREATE CATALOG `paimon_oss` PROPERTIES (
"type" = "paimon",
"warehouse" = "oss://paimon-zd/paimonoss",
"oss.endpoint"="oss-cn-beijing.aliyuncs.com",
"oss.access_key"="ak",
"oss.secret_key"="sk"
"oss.endpoint" = "oss-cn-beijing.aliyuncs.com",
"oss.access_key" = "ak",
"oss.secret_key" = "sk"
);

```
Expand All @@ -99,15 +101,15 @@ CREATE CATALOG `paimon_oss` PROPERTIES (
```sql
CREATE CATALOG `paimon_hms` PROPERTIES (
"type" = "paimon",
"paimon.catalog.type"="hms",
"paimon.catalog.type" = "hms",
"warehouse" = "hdfs://HDFS8000871/user/zhangdong/paimon2",
"hive.metastore.uris" = "thrift://172.21.0.44:7004",
"dfs.nameservices'='HDFS8000871",
"dfs.ha.namenodes.HDFS8000871'='nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007",
"dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007",
"dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username"="hadoop"
"dfs.nameservices" = "HDFS8000871",
"dfs.ha.namenodes.HDFS8000871" = "nn1,nn2",
"dfs.namenode.rpc-address.HDFS8000871.nn1" = "172.21.0.1:4007",
"dfs.namenode.rpc-address.HDFS8000871.nn2" = "172.21.0.2:4007",
"dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"hadoop.username" = "hadoop"
);

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ public void buildReplicaAllocation() {
// Must copy the properties because "analyzeReplicaAllocation" will remove the property
// from the properties.
Map<String, String> copiedProperties = Maps.newHashMap(properties);
this.replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(copiedProperties, "default");
this.replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocationWithoutCheck(
copiedProperties, "default");
} catch (AnalysisException e) {
// should not happen
LOG.error("should not happen when build replica allocation", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_COMPACTION_POLICY = "compaction_policy";

public static final String PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES =
"time_series_compaction_goal_size_mbytes";
"time_series_compaction_goal_size_mbytes";

public static final String PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD =
"time_series_compaction_file_count_threshold";
"time_series_compaction_file_count_threshold";

public static final String PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS =
"time_series_compaction_time_threshold_seconds";
"time_series_compaction_time_threshold_seconds";
public static final String PROPERTIES_MUTABLE = "mutable";

public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced";
Expand All @@ -151,7 +151,7 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_BINLOG_MAX_HISTORY_NUMS = "binlog.max_history_nums";

public static final String PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT =
"enable_duplicate_without_keys_by_default";
"enable_duplicate_without_keys_by_default";
// For unique key data model, the feature Merge-on-Write will leverage a primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load stage,
// which can avoid the merging cost in read stage, and accelerate the aggregation
Expand All @@ -172,8 +172,6 @@ public class PropertyAnalyzer {
public static final long TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600;




/**
* check and replace members of DataProperty by properties.
*
Expand Down Expand Up @@ -585,7 +583,7 @@ public static Boolean analyzeEnableSingleReplicaCompaction(Map<String, String> p
}

public static Boolean analyzeEnableDuplicateWithoutKeysByDefault(Map<String, String> properties)
throws AnalysisException {
throws AnalysisException {
if (properties == null || properties.isEmpty()) {
return false;
}
Expand Down Expand Up @@ -650,7 +648,7 @@ public static String analyzeCompactionPolicy(Map<String, String> properties) thr
compactionPolicy = properties.get(PROPERTIES_COMPACTION_POLICY);
properties.remove(PROPERTIES_COMPACTION_POLICY);
if (compactionPolicy != null && !compactionPolicy.equals(TIME_SERIES_COMPACTION_POLICY)
&& !compactionPolicy.equals(SIZE_BASED_COMPACTION_POLICY)) {
&& !compactionPolicy.equals(SIZE_BASED_COMPACTION_POLICY)) {
throw new AnalysisException(PROPERTIES_COMPACTION_POLICY
+ " must be " + TIME_SERIES_COMPACTION_POLICY + " or " + SIZE_BASED_COMPACTION_POLICY);
}
Expand All @@ -660,7 +658,7 @@ public static String analyzeCompactionPolicy(Map<String, String> properties) thr
}

public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map<String, String> properties)
throws AnalysisException {
throws AnalysisException {
long goalSizeMbytes = TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE;
if (properties == null || properties.isEmpty()) {
return goalSizeMbytes;
Expand All @@ -672,7 +670,7 @@ public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map<String, String>
goalSizeMbytes = Long.parseLong(goalSizeMbytesStr);
if (goalSizeMbytes < 10) {
throw new AnalysisException("time_series_compaction_goal_size_mbytes can not be"
+ " less than 10: " + goalSizeMbytesStr);
+ " less than 10: " + goalSizeMbytesStr);
}
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid time_series_compaction_goal_size_mbytes format: "
Expand All @@ -683,31 +681,31 @@ public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map<String, String>
}

public static long analyzeTimeSeriesCompactionFileCountThreshold(Map<String, String> properties)
throws AnalysisException {
throws AnalysisException {
long fileCountThreshold = TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE;
if (properties == null || properties.isEmpty()) {
return fileCountThreshold;
}
if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) {
String fileCountThresholdStr = properties
.get(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
.get(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD);
try {
fileCountThreshold = Long.parseLong(fileCountThresholdStr);
if (fileCountThreshold < 10) {
throw new AnalysisException("time_series_compaction_file_count_threshold can not be "
+ "less than 10: " + fileCountThresholdStr);
+ "less than 10: " + fileCountThresholdStr);
}
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid time_series_compaction_file_count_threshold format: "
+ fileCountThresholdStr);
+ fileCountThresholdStr);
}
}
return fileCountThreshold;
}

public static long analyzeTimeSeriesCompactionTimeThresholdSeconds(Map<String, String> properties)
throws AnalysisException {
throws AnalysisException {
long timeThresholdSeconds = TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE;
if (properties == null || properties.isEmpty()) {
return timeThresholdSeconds;
Expand All @@ -719,11 +717,11 @@ public static long analyzeTimeSeriesCompactionTimeThresholdSeconds(Map<String, S
timeThresholdSeconds = Long.parseLong(timeThresholdSecondsStr);
if (timeThresholdSeconds < 60) {
throw new AnalysisException("time_series_compaction_time_threshold_seconds can not be"
+ " less than 60: " + timeThresholdSecondsStr);
+ " less than 60: " + timeThresholdSecondsStr);
}
} catch (NumberFormatException e) {
throw new AnalysisException("Invalid time_series_compaction_time_threshold_seconds format: "
+ timeThresholdSecondsStr);
+ timeThresholdSecondsStr);
}
}
return timeThresholdSeconds;
Expand Down Expand Up @@ -959,14 +957,27 @@ public static boolean analyzeIsBeingSynced(Map<String, String> properties, boole
return defaultValue;
}

// analyze replica allocation property without checking if backends can satisfy the allocation
// mainly used for metadata replay.
public static ReplicaAllocation analyzeReplicaAllocationWithoutCheck(Map<String, String> properties,
String prefix) throws AnalysisException {
return analyzeReplicaAllocationImpl(properties, prefix, false);
}

public static ReplicaAllocation analyzeReplicaAllocation(Map<String, String> properties, String prefix)
throws AnalysisException {
return analyzeReplicaAllocationImpl(properties, prefix, true);
}

// There are 2 kinds of replication property:
// 1. "replication_num" = "3"
// 2. "replication_allocation" = "tag.location.zone1: 2, tag.location.zone2: 1"
// These 2 kinds of property will all be converted to a ReplicaAllocation and return.
// Return ReplicaAllocation.NOT_SET if no replica property is set.
//
// prefix is for property key such as "dynamic_partition.replication_num", which prefix is "dynamic_partition"
public static ReplicaAllocation analyzeReplicaAllocation(Map<String, String> properties, String prefix)
private static ReplicaAllocation analyzeReplicaAllocationImpl(Map<String, String> properties, String prefix,
boolean checkBackends)
throws AnalysisException {
if (properties == null || properties.isEmpty()) {
return ReplicaAllocation.NOT_SET;
Expand Down Expand Up @@ -1010,12 +1021,14 @@ public static ReplicaAllocation analyzeReplicaAllocation(Map<String, String> pro

// Check if the current backends satisfy the ReplicaAllocation condition,
// to avoid user set it success but failed to create table or dynamic partitions
try {
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
systemInfoService.selectBackendIdsForReplicaCreation(
replicaAlloc, null, false, true);
} catch (DdlException ddlException) {
throw new AnalysisException(ddlException.getMessage());
if (checkBackends) {
try {
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
systemInfoService.selectBackendIdsForReplicaCreation(
replicaAlloc, null, false, true);
} catch (DdlException ddlException) {
throw new AnalysisException(ddlException.getMessage());
}
}
}
if (totalReplicaNum < Config.min_replication_num_per_tablet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog;
import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
Expand Down Expand Up @@ -202,6 +203,7 @@ public class GsonUtils {
.registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonFileExternalCatalog.class, PaimonFileExternalCatalog.class.getSimpleName())
.registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName());
// routine load data source
private static RuntimeTypeAdapterFactory<AbstractDataSourceProperties> rdsTypeAdapterFactory =
Expand Down
Loading

0 comments on commit 4ece7aa

Please sign in to comment.