diff --git a/docs/en/docs/lakehouse/multi-catalog/paimon.md b/docs/en/docs/lakehouse/multi-catalog/paimon.md index 68bf4cb0112f23..8b9ce2e49fb766 100644 --- a/docs/en/docs/lakehouse/multi-catalog/paimon.md +++ b/docs/en/docs/lakehouse/multi-catalog/paimon.md @@ -43,17 +43,19 @@ Paimon Catalog Currently supports two types of Metastore creation catalogs: ### Creating a Catalog Based on FileSystem +> For versions 2.0.1 and earlier, please use the following `Create Catalog based on Hive Metastore`. + #### HDFS ```sql CREATE CATALOG `paimon_hdfs` PROPERTIES ( "type" = "paimon", "warehouse" = "hdfs://HDFS8000871/user/paimon", - "dfs.nameservices"="HDFS8000871", - "dfs.ha.namenodes.HDFS8000871"="nn1,nn2", - "dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007", - "dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007", - "dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", - "hadoop.username"="hadoop" + "dfs.nameservices" = "HDFS8000871", + "dfs.ha.namenodes.HDFS8000871" = "nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1" = "172.21.0.1:4007", + "dfs.namenode.rpc-address.HDFS8000871.nn2" = "172.21.0.2:4007", + "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "hadoop.username" = "hadoop" ); ``` @@ -69,9 +71,9 @@ CREATE CATALOG `paimon_hdfs` PROPERTIES ( CREATE CATALOG `paimon_s3` PROPERTIES ( "type" = "paimon", "warehouse" = "s3://paimon-1308700295.cos.ap-beijing.myqcloud.com/paimoncos", - "s3.endpoint"="cos.ap-beijing.myqcloud.com", - "s3.access_key"="ak", - "s3.secret_key"="sk" + "s3.endpoint" = "cos.ap-beijing.myqcloud.com", + "s3.access_key" = "ak", + "s3.secret_key" = "sk" ); ``` @@ -88,9 +90,9 @@ CREATE CATALOG `paimon_s3` PROPERTIES ( CREATE CATALOG `paimon_oss` PROPERTIES ( "type" = "paimon", "warehouse" = "oss://paimon-zd/paimonoss", - "oss.endpoint"="oss-cn-beijing.aliyuncs.com", - "oss.access_key"="ak", - "oss.secret_key"="sk" + "oss.endpoint" = "oss-cn-beijing.aliyuncs.com", + "oss.access_key" = "ak", + "oss.secret_key" = "sk" ); ``` @@ -100,15 +102,15 @@ CREATE CATALOG `paimon_oss` PROPERTIES ( ```sql CREATE CATALOG `paimon_hms` PROPERTIES ( "type" = "paimon", - "paimon.catalog.type"="hms", + "paimon.catalog.type" = "hms", "warehouse" = "hdfs://HDFS8000871/user/zhangdong/paimon2", "hive.metastore.uris" = "thrift://172.21.0.44:7004", - "dfs.nameservices'='HDFS8000871", - "dfs.ha.namenodes.HDFS8000871'='nn1,nn2", - "dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007", - "dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007", - "dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", - "hadoop.username"="hadoop" + "dfs.nameservices" = "HDFS8000871", + "dfs.ha.namenodes.HDFS8000871" = "nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1" = "172.21.0.1:4007", + "dfs.namenode.rpc-address.HDFS8000871.nn2" = "172.21.0.2:4007", + "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "hadoop.username" = "hadoop" ); ``` diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/paimon.md b/docs/zh-CN/docs/lakehouse/multi-catalog/paimon.md index 73cab27a034dc2..38b7ed425a68bc 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/paimon.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/paimon.md @@ -43,17 +43,19 @@ Paimon Catalog 当前支持两种类型的Metastore创建Catalog: ### 基于FileSystem创建Catalog +> 2.0.1 及之前版本,请使用后面的 `基于Hive Metastore创建Catalog`。 + #### HDFS ```sql CREATE CATALOG `paimon_hdfs` PROPERTIES ( "type" = "paimon", "warehouse" = "hdfs://HDFS8000871/user/paimon", - "dfs.nameservices"="HDFS8000871", - "dfs.ha.namenodes.HDFS8000871"="nn1,nn2", - "dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007", - "dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007", - "dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", - "hadoop.username"="hadoop" + "dfs.nameservices" = "HDFS8000871", + "dfs.ha.namenodes.HDFS8000871" = "nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1" = "172.21.0.1:4007", + "dfs.namenode.rpc-address.HDFS8000871.nn2" = "172.21.0.2:4007", + "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "hadoop.username" = "hadoop" ); ``` @@ -69,9 +71,9 @@ CREATE CATALOG `paimon_hdfs` PROPERTIES ( CREATE CATALOG `paimon_s3` PROPERTIES ( "type" = "paimon", "warehouse" = "s3://paimon-1308700295.cos.ap-beijing.myqcloud.com/paimoncos", - "s3.endpoint"="cos.ap-beijing.myqcloud.com", - "s3.access_key"="ak", - "s3.secret_key"="sk" + "s3.endpoint" = "cos.ap-beijing.myqcloud.com", + "s3.access_key" = "ak", + "s3.secret_key" = "sk" ); ``` @@ -87,9 +89,9 @@ CREATE CATALOG `paimon_s3` PROPERTIES ( CREATE CATALOG `paimon_oss` PROPERTIES ( "type" = "paimon", "warehouse" = "oss://paimon-zd/paimonoss", - "oss.endpoint"="oss-cn-beijing.aliyuncs.com", - "oss.access_key"="ak", - "oss.secret_key"="sk" + "oss.endpoint" = "oss-cn-beijing.aliyuncs.com", + "oss.access_key" = "ak", + "oss.secret_key" = "sk" ); ``` @@ -99,15 +101,15 @@ CREATE CATALOG `paimon_oss` PROPERTIES ( ```sql CREATE CATALOG `paimon_hms` PROPERTIES ( "type" = "paimon", - "paimon.catalog.type"="hms", + "paimon.catalog.type" = "hms", "warehouse" = "hdfs://HDFS8000871/user/zhangdong/paimon2", "hive.metastore.uris" = "thrift://172.21.0.44:7004", - "dfs.nameservices'='HDFS8000871", - "dfs.ha.namenodes.HDFS8000871'='nn1,nn2", - "dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007", - "dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007", - "dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", - "hadoop.username"="hadoop" + "dfs.nameservices" = "HDFS8000871", + "dfs.ha.namenodes.HDFS8000871" = "nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1" = "172.21.0.1:4007", + "dfs.namenode.rpc-address.HDFS8000871.nn2" = "172.21.0.2:4007", + "dfs.client.failover.proxy.provider.HDFS8000871" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "hadoop.username" = "hadoop" ); ``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index b0ae5c469ee4d8..0a6a7e6a6d204d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -480,7 +480,8 @@ public void buildReplicaAllocation() { // Must copy the properties because "analyzeReplicaAllocation" will remove the property // from the properties. Map copiedProperties = Maps.newHashMap(properties); - this.replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(copiedProperties, "default"); + this.replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocationWithoutCheck( + copiedProperties, "default"); } catch (AnalysisException e) { // should not happen LOG.error("should not happen when build replica allocation", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index f916d13336aa18..6a9c4bada1bdd6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -132,13 +132,13 @@ public class PropertyAnalyzer { public static final String PROPERTIES_COMPACTION_POLICY = "compaction_policy"; public static final String PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES = - "time_series_compaction_goal_size_mbytes"; + "time_series_compaction_goal_size_mbytes"; public static final String PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD = - "time_series_compaction_file_count_threshold"; + "time_series_compaction_file_count_threshold"; public static final String PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS = - "time_series_compaction_time_threshold_seconds"; + "time_series_compaction_time_threshold_seconds"; public static final String PROPERTIES_MUTABLE = "mutable"; public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced"; @@ -151,7 +151,7 @@ public class PropertyAnalyzer { public static final String PROPERTIES_BINLOG_MAX_HISTORY_NUMS = "binlog.max_history_nums"; public static final String PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT = - "enable_duplicate_without_keys_by_default"; + "enable_duplicate_without_keys_by_default"; // For unique key data model, the feature Merge-on-Write will leverage a primary // key index and a delete-bitmap to mark duplicate keys as deleted in load stage, // which can avoid the merging cost in read stage, and accelerate the aggregation @@ -172,8 +172,6 @@ public class PropertyAnalyzer { public static final long TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600; - - /** * check and replace members of DataProperty by properties. * @@ -585,7 +583,7 @@ public static Boolean analyzeEnableSingleReplicaCompaction(Map p } public static Boolean analyzeEnableDuplicateWithoutKeysByDefault(Map properties) - throws AnalysisException { + throws AnalysisException { if (properties == null || properties.isEmpty()) { return false; } @@ -650,7 +648,7 @@ public static String analyzeCompactionPolicy(Map properties) thr compactionPolicy = properties.get(PROPERTIES_COMPACTION_POLICY); properties.remove(PROPERTIES_COMPACTION_POLICY); if (compactionPolicy != null && !compactionPolicy.equals(TIME_SERIES_COMPACTION_POLICY) - && !compactionPolicy.equals(SIZE_BASED_COMPACTION_POLICY)) { + && !compactionPolicy.equals(SIZE_BASED_COMPACTION_POLICY)) { throw new AnalysisException(PROPERTIES_COMPACTION_POLICY + " must be " + TIME_SERIES_COMPACTION_POLICY + " or " + SIZE_BASED_COMPACTION_POLICY); } @@ -660,7 +658,7 @@ public static String analyzeCompactionPolicy(Map properties) thr } public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map properties) - throws AnalysisException { + throws AnalysisException { long goalSizeMbytes = TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES_DEFAULT_VALUE; if (properties == null || properties.isEmpty()) { return goalSizeMbytes; @@ -672,7 +670,7 @@ public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map goalSizeMbytes = Long.parseLong(goalSizeMbytesStr); if (goalSizeMbytes < 10) { throw new AnalysisException("time_series_compaction_goal_size_mbytes can not be" - + " less than 10: " + goalSizeMbytesStr); + + " less than 10: " + goalSizeMbytesStr); } } catch (NumberFormatException e) { throw new AnalysisException("Invalid time_series_compaction_goal_size_mbytes format: " @@ -683,31 +681,31 @@ public static long analyzeTimeSeriesCompactionGoalSizeMbytes(Map } public static long analyzeTimeSeriesCompactionFileCountThreshold(Map properties) - throws AnalysisException { + throws AnalysisException { long fileCountThreshold = TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; if (properties == null || properties.isEmpty()) { return fileCountThreshold; } if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD)) { String fileCountThresholdStr = properties - .get(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD); + .get(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD); properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD); try { fileCountThreshold = Long.parseLong(fileCountThresholdStr); if (fileCountThreshold < 10) { throw new AnalysisException("time_series_compaction_file_count_threshold can not be " - + "less than 10: " + fileCountThresholdStr); + + "less than 10: " + fileCountThresholdStr); } } catch (NumberFormatException e) { throw new AnalysisException("Invalid time_series_compaction_file_count_threshold format: " - + fileCountThresholdStr); + + fileCountThresholdStr); } } return fileCountThreshold; } public static long analyzeTimeSeriesCompactionTimeThresholdSeconds(Map properties) - throws AnalysisException { + throws AnalysisException { long timeThresholdSeconds = TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE; if (properties == null || properties.isEmpty()) { return timeThresholdSeconds; @@ -719,11 +717,11 @@ public static long analyzeTimeSeriesCompactionTimeThresholdSeconds(Map properties, boole return defaultValue; } + // analyze replica allocation property without checking if backends can satisfy the allocation + // mainly used for metadata replay. + public static ReplicaAllocation analyzeReplicaAllocationWithoutCheck(Map properties, + String prefix) throws AnalysisException { + return analyzeReplicaAllocationImpl(properties, prefix, false); + } + + public static ReplicaAllocation analyzeReplicaAllocation(Map properties, String prefix) + throws AnalysisException { + return analyzeReplicaAllocationImpl(properties, prefix, true); + } + // There are 2 kinds of replication property: // 1. "replication_num" = "3" // 2. "replication_allocation" = "tag.location.zone1: 2, tag.location.zone2: 1" @@ -966,7 +976,8 @@ public static boolean analyzeIsBeingSynced(Map properties, boole // Return ReplicaAllocation.NOT_SET if no replica property is set. // // prefix is for property key such as "dynamic_partition.replication_num", which prefix is "dynamic_partition" - public static ReplicaAllocation analyzeReplicaAllocation(Map properties, String prefix) + private static ReplicaAllocation analyzeReplicaAllocationImpl(Map properties, String prefix, + boolean checkBackends) throws AnalysisException { if (properties == null || properties.isEmpty()) { return ReplicaAllocation.NOT_SET; @@ -1010,12 +1021,14 @@ public static ReplicaAllocation analyzeReplicaAllocation(Map pro // Check if the current backends satisfy the ReplicaAllocation condition, // to avoid user set it success but failed to create table or dynamic partitions - try { - SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); - systemInfoService.selectBackendIdsForReplicaCreation( - replicaAlloc, null, false, true); - } catch (DdlException ddlException) { - throw new AnalysisException(ddlException.getMessage()); + if (checkBackends) { + try { + SystemInfoService systemInfoService = Env.getCurrentSystemInfo(); + systemInfoService.selectBackendIdsForReplicaCreation( + replicaAlloc, null, false, true); + } catch (DdlException ddlException) { + throw new AnalysisException(ddlException.getMessage()); + } } } if (totalReplicaNum < Config.min_replication_num_per_tablet diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index b61b2d5146d65d..2c7c32caa0a4a8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -67,6 +67,7 @@ import org.apache.doris.datasource.iceberg.IcebergRestExternalCatalog; import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog; import org.apache.doris.datasource.paimon.PaimonHMSExternalCatalog; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; @@ -202,6 +203,7 @@ public class GsonUtils { .registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName()) .registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName()) + .registerSubtype(PaimonFileExternalCatalog.class, PaimonFileExternalCatalog.class.getSimpleName()) .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName()); // routine load data source private static RuntimeTypeAdapterFactory rdsTypeAdapterFactory = diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy new file mode 100644 index 00000000000000..a6e687b3e0ed8d --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_doris") { + + String file_ctl_name = "paimon_file_catalog"; + String hms_ctl_name = "paimon_hms_catalog"; + + // This is only for testing creating catalog + sql """DROP CATALOG IF EXISTS ${file_ctl_name}""" + sql """ + CREATE CATALOG ${file_ctl_name} PROPERTIES ( + "type" = "paimon", + "warehouse" = "hdfs://HDFS8000871/user/paimon", + "dfs.nameservices"="HDFS8000871", + "dfs.ha.namenodes.HDFS8000871"="nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007", + "dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007", + "dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "hadoop.username"="hadoop" + ); + """ + + // This is only for testing creating catalog + sql """DROP CATALOG IF EXISTS ${hms_ctl_name}""" + sql """ + CREATE CATALOG ${hms_ctl_name} PROPERTIES ( + "type" = "paimon", + "paimon.catalog.type"="hms", + "warehouse" = "hdfs://HDFS8000871/user/zhangdong/paimon2", + "hive.metastore.uris" = "thrift://172.21.0.44:7004", + "dfs.nameservices"="HDFS8000871", + "dfs.ha.namenodes.HDFS8000871"="nn1,nn2", + "dfs.namenode.rpc-address.HDFS8000871.nn1"="172.21.0.1:4007", + "dfs.namenode.rpc-address.HDFS8000871.nn2"="172.21.0.2:4007", + "dfs.client.failover.proxy.provider.HDFS8000871"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider", + "hadoop.username"="hadoop" + ); + """ +}