From e357efe88ca6a4079206dc4143768192fe64a4d0 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Thu, 19 Dec 2024 21:24:46 +0800 Subject: [PATCH 1/8] rebase --- flink-connector/flink/build.gradle.kts | 13 +- .../paimon/GravitinoPaimonCatalog.java | 47 +++++ .../paimon/GravitinoPaimonCatalogFactory.java | 81 ++++++++ .../GravitinoPaimonCatalogFactoryOptions.java | 46 +++++ .../paimon/PaimonPropertiesConverter.java | 90 +++++++++ .../org.apache.flink.table.factories.Factory | 3 +- .../integration/test/FlinkCommonIT.java | 28 +++ .../test/paimon/FlinkPaimonCatalogIT.java | 189 ++++++++++++++++++ .../paimon/TestPaimonPropertiesConverter.java | 108 ++++++++++ 9 files changed, 603 insertions(+), 2 deletions(-) create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java create mode 100644 flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java create mode 100644 flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index 9e2a48c036c..fafaa760701 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -26,6 +26,7 @@ repositories { mavenCentral() } +var paimonVersion: String = libs.versions.paimon.get() val flinkVersion: String = libs.versions.flink.get() val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") @@ -37,15 +38,22 @@ val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") val scalaVersion: String = "2.12" val artifactName = "${rootProject.name}-flink-${flinkMajorVersion}_$scalaVersion" +tasks.compileJava { + dependsOn(":catalogs:catalog-lakehouse-paimon:runtimeJars") +} + dependencies { + implementation(project(":core")) implementation(project(":catalogs:catalog-common")) + implementation(libs.guava) compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) - + compileOnly(project(":catalogs:catalog-lakehouse-paimon")) compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") + compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion") compileOnly(libs.hive2.exec) { artifact { @@ -75,6 +83,7 @@ dependencies { testImplementation(project(":clients:client-java")) testImplementation(project(":core")) testImplementation(project(":common")) + testImplementation(project(":catalogs:catalog-lakehouse-paimon")) testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation(project(":server")) testImplementation(project(":server-common")) @@ -90,6 +99,7 @@ dependencies { testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") testImplementation("org.apache.flink:flink-table-common:$flinkVersion") testImplementation("org.apache.flink:flink-table-api-java:$flinkVersion") + testImplementation("org.apache.paimon:paimon-flink-$flinkMajorVersion:$paimonVersion") testImplementation(libs.hive2.exec) { artifact { @@ -170,6 +180,7 @@ tasks.test { } else { dependsOn(tasks.jar) dependsOn(":catalogs:catalog-hive:jar") + dependsOn(":catalogs:catalog-lakehouse-paimon:jar") } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java new file mode 100644 index 00000000000..f403c54e69e --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalog; + +/** + * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to + * proxy the PaimonCatalog class. + */ +public class GravitinoPaimonCatalog extends BaseCatalog { + + private AbstractCatalog paimonCatalog; + + protected GravitinoPaimonCatalog( + String catalogName, + AbstractCatalog paimonCatalog, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter) { + super(catalogName, paimonCatalog.getDefaultDatabase(), propertiesConverter, partitionConverter); + } + + @Override + protected AbstractCatalog realCatalog() { + return paimonCatalog; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java new file mode 100644 index 00000000000..a56c1a2854d --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import com.google.common.collect.ImmutableSet; +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.Catalog; +import org.apache.gravitino.flink.connector.DefaultPartitionConverter; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.paimon.flink.FlinkCatalog; +import org.apache.paimon.flink.FlinkCatalogFactory; + +/** + * Factory for creating instances of {@link GravitinoPaimonCatalog}. It will be created by SPI + * discovery in Flink. + */ +public class GravitinoPaimonCatalogFactory implements BaseCatalogFactory { + + @Override + public Catalog createCatalog(Context context) { + FlinkCatalog catalog = new FlinkCatalogFactory().createCatalog(context); + return new GravitinoPaimonCatalog( + context.getName(), catalog, propertiesConverter(), partitionConverter()); + } + + @Override + public String factoryIdentifier() { + return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return ImmutableSet.of(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public String gravitinoCatalogProvider() { + return "lakehouse-paimon"; + } + + @Override + public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { + return org.apache.gravitino.Catalog.Type.RELATIONAL; + } + + @Override + public PropertiesConverter propertiesConverter() { + return PaimonPropertiesConverter.INSTANCE; + } + + @Override + public PartitionConverter partitionConverter() { + return DefaultPartitionConverter.INSTANCE; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java new file mode 100644 index 00000000000..c01b89714aa --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class GravitinoPaimonCatalogFactoryOptions { + + /** Identifier for the {@link GravitinoPaimonCatalog}. */ + public static final String IDENTIFIER = "gravitino-paimon"; + + public static ConfigOption CATALOG_BACKEND = + ConfigOptions.key("catalog.backend") + .stringType() + .defaultValue("fileSystem") + .withDescription(""); + + public static ConfigOption WAREHOUSE = + ConfigOptions.key("warehouse").stringType().noDefaultValue(); + + public static ConfigOption URI = ConfigOptions.key("uri").stringType().noDefaultValue(); + + public static ConfigOption JDBC_USER = + ConfigOptions.key("jdbc.user").stringType().noDefaultValue(); + + public static ConfigOption JDBC_PASSWORD = + ConfigOptions.key("jdbc.password").stringType().noDefaultValue(); +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java new file mode 100644 index 00000000000..80285fb1619 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.paimon; + +import com.google.common.collect.Maps; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogBackend; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.paimon.options.CatalogOptions; + +public class PaimonPropertiesConverter implements PropertiesConverter { + + public static final PaimonPropertiesConverter INSTANCE = new PaimonPropertiesConverter(); + + private PaimonPropertiesConverter() {} + + @Override + public Map toGravitinoCatalogProperties(Configuration flinkConf) { + Map gravitinoCatalogProperties = flinkConf.toMap(); + String warehouse = flinkConf.get(GravitinoPaimonCatalogFactoryOptions.WAREHOUSE); + gravitinoCatalogProperties.put(PaimonConfig.CATALOG_WAREHOUSE.getKey(), warehouse); + String backendType = flinkConf.get(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND); + gravitinoCatalogProperties.put( + PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, backendType); + if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(backendType)) { + gravitinoCatalogProperties.put( + PaimonConfig.CATALOG_URI.getKey(), + flinkConf.get(GravitinoPaimonCatalogFactoryOptions.URI)); + gravitinoCatalogProperties.put( + PaimonConfig.CATALOG_JDBC_USER.getKey(), + flinkConf.get(GravitinoPaimonCatalogFactoryOptions.JDBC_USER)); + gravitinoCatalogProperties.put( + PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(), + flinkConf.get(GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD)); + } else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(backendType)) { + throw new UnsupportedOperationException( + "The Gravitino Connector does not currently support creating a Paimon Catalog that uses Hive Metastore."); + } + return gravitinoCatalogProperties; + } + + @Override + public Map toFlinkCatalogProperties(Map gravitinoProperties) { + Map flinkCatalogProperties = Maps.newHashMap(); + flinkCatalogProperties.putAll(gravitinoProperties); + String backendType = + flinkCatalogProperties.get(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND); + if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(backendType)) { + flinkCatalogProperties.put(CatalogOptions.METASTORE.key(), backendType); + flinkCatalogProperties.put( + GravitinoPaimonCatalogFactoryOptions.URI.key(), + gravitinoProperties.get(PaimonConfig.CATALOG_URI.getKey())); + flinkCatalogProperties.put( + GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key(), + gravitinoProperties.get(PaimonConfig.CATALOG_JDBC_USER.getKey())); + flinkCatalogProperties.put( + GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key(), + gravitinoProperties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey())); + } else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(backendType)) { + throw new UnsupportedOperationException( + "The Gravitino Connector does not currently support creating a Paimon Catalog that uses Hive Metastore."); + } + flinkCatalogProperties.put( + GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key(), backendType); + flinkCatalogProperties.put( + CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); + return flinkCatalogProperties; + } +} diff --git a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index c9d9c92b5ef..a535afb6dc2 100644 --- a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -18,4 +18,5 @@ # org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory -org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory \ No newline at end of file +org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory +org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory \ No newline at end of file diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 2d022b4a8a4..3c9958ed3b6 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -53,12 +53,26 @@ import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; public abstract class FlinkCommonIT extends FlinkEnvIT { protected abstract Catalog currentCatalog(); + protected boolean supportSchemaOperation() { + return true; + } + + protected boolean supportTableOperation() { + return true; + } + + protected boolean supportColumnOperation() { + return true; + } + @Test + @EnabledIf("supportSchemaOperation") public void testCreateSchema() { doWithCatalog( currentCatalog(), @@ -76,6 +90,7 @@ public void testCreateSchema() { } @Test + @EnabledIf("supportSchemaOperation") public void testGetSchema() { doWithCatalog( currentCatalog(), @@ -110,6 +125,7 @@ public void testGetSchema() { } @Test + @EnabledIf("supportSchemaOperation") public void testListSchema() { doWithCatalog( currentCatalog(), @@ -150,6 +166,7 @@ public void testListSchema() { } @Test + @EnabledIf("supportSchemaOperation") public void testAlterSchema() { doWithCatalog( currentCatalog(), @@ -188,6 +205,7 @@ public void testAlterSchema() { } @Test + @EnabledIf("supportTableOperation") public void testCreateSimpleTable() { String databaseName = "test_create_no_partition_table_db"; String tableName = "test_create_no_partition_table"; @@ -236,6 +254,7 @@ public void testCreateSimpleTable() { } @Test + @EnabledIf("supportTableOperation") public void testListTables() { String newSchema = "test_list_table_catalog"; Column[] columns = new Column[] {Column.of("user_id", Types.IntegerType.get(), "USER_ID")}; @@ -268,6 +287,7 @@ public void testListTables() { } @Test + @EnabledIf("supportTableOperation") public void testDropTable() { String databaseName = "test_drop_table_db"; doWithSchema( @@ -289,6 +309,7 @@ public void testDropTable() { } @Test + @EnabledIf("supportTableOperation") public void testGetSimpleTable() { String databaseName = "test_get_simple_table"; Column[] columns = @@ -342,6 +363,7 @@ public void testGetSimpleTable() { } @Test + @EnabledIf("supportColumnOperation") public void testRenameColumn() { String databaseName = "test_rename_column_db"; String tableName = "test_rename_column"; @@ -377,6 +399,7 @@ public void testRenameColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableComment() { String databaseName = "test_alter_table_comment_database"; String tableName = "test_alter_table_comment"; @@ -436,6 +459,7 @@ public void testAlterTableComment() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableAddColumn() { String databaseName = "test_alter_table_add_column_db"; String tableName = "test_alter_table_add_column"; @@ -471,6 +495,7 @@ public void testAlterTableAddColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterTableDropColumn() { String databaseName = "test_alter_table_drop_column_db"; String tableName = "test_alter_table_drop_column"; @@ -501,6 +526,7 @@ public void testAlterTableDropColumn() { } @Test + @EnabledIf("supportColumnOperation") public void testAlterColumnTypeAndChangeOrder() { String databaseName = "test_alter_table_alter_column_db"; String tableName = "test_alter_table_rename_column"; @@ -542,6 +568,7 @@ public void testAlterColumnTypeAndChangeOrder() { } @Test + @EnabledIf("supportTableOperation") public void testRenameTable() { String databaseName = "test_rename_table_db"; String tableName = "test_rename_table"; @@ -569,6 +596,7 @@ public void testRenameTable() { } @Test + @EnabledIf("supportTableOperation") public void testAlterTableProperties() { String databaseName = "test_alter_table_properties_db"; String tableName = "test_alter_table_properties"; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java new file mode 100644 index 00000000000..f06746e98ca --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.integration.test.paimon; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Map; +import org.apache.flink.table.api.ResultKind; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.Schema; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; +import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +public class FlinkPaimonCatalogIT extends FlinkCommonIT { + + private static final String DEFAULT_PAIMON_CATALOG = + "test_flink_paimon_filesystem_schema_catalog"; + + private static org.apache.gravitino.Catalog catalog; + + @Override + protected boolean supportColumnOperation() { + return false; + } + + @Override + protected boolean supportTableOperation() { + return false; + } + + @Override + protected boolean supportSchemaOperation() { + return false; + } + + protected Catalog currentCatalog() { + return catalog; + } + + @BeforeAll + static void setup() { + initPaimonCatalog(); + } + + @AfterAll + static void stop() { + Preconditions.checkNotNull(metalake); + metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true); + } + + private static void initPaimonCatalog() { + Preconditions.checkNotNull(metalake); + catalog = + metalake.createCatalog( + DEFAULT_PAIMON_CATALOG, + org.apache.gravitino.Catalog.Type.RELATIONAL, + "lakehouse-paimon", + null, + ImmutableMap.of( + PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, + "filesystem", + "warehouse", + "/tmp/gravitino/paimon")); + } + + @Test + public void testCreateSchema() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_create_schema"; + try { + TableResult tableResult = sql("CREATE DATABASE IF NOT EXISTS %s", schema); + TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS); + catalog.asSchemas().schemaExists(schema); + } finally { + catalog.asSchemas().dropSchema(schema, true); + Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); + } + }); + } + + @Test + public void testGetSchema() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_get_schema"; + try { + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); + TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); + + catalog.asSchemas().schemaExists(schema); + Schema loadedSchema = catalog.asSchemas().loadSchema(schema); + Assertions.assertEquals(schema, loadedSchema.name()); + } finally { + catalog.asSchemas().dropSchema(schema, true); + Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); + } + }); + } + + @Test + public void testListSchema() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_list_schema"; + String schema2 = "test_list_schema2"; + String schema3 = "test_list_schema3"; + + try { + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema2), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema3), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("SHOW DATABASES"), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of("default"), + Row.of(schema), + Row.of(schema2), + Row.of(schema3)); + + String[] schemas = catalog.asSchemas().listSchemas(); + Arrays.sort(schemas); + Assertions.assertEquals(4, schemas.length); + Assertions.assertEquals("default", schemas[0]); + Assertions.assertEquals(schema, schemas[1]); + Assertions.assertEquals(schema2, schemas[2]); + Assertions.assertEquals(schema3, schemas[3]); + } finally { + catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema2, true); + catalog.asSchemas().dropSchema(schema3, true); + Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); + } + }); + } + + @Test + public void testCreateGravitinoPaimonCatalogUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "gravitino_hive_sql"; + String warehouse = "/tmp/gravitino/paimon"; + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='gravitino-paimon', " + + "'warehouse'='%s'," + + "'catalog.backend'='filesystem'" + + ")", + catalogName, warehouse)); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals(warehouse, properties.get("warehouse")); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java new file mode 100644 index 00000000000..5b37c5a3a97 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.flink.connector.paimon; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Test for {@link PaimonPropertiesConverter} */ +public class TestPaimonPropertiesConverter { + + private static final PaimonPropertiesConverter CONVERTER = PaimonPropertiesConverter.INSTANCE; + + private static final String localWarehouse = "file:///tmp/paimon_warehouse"; + + @Test + public void testToPaimonFileSystemCatalog() { + Map catalogProperties = ImmutableMap.of("warehouse", localWarehouse); + Map flinkCatalogProperties = + CONVERTER.toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals( + GravitinoPaimonCatalogFactoryOptions.IDENTIFIER, flinkCatalogProperties.get("type")); + Assertions.assertEquals(localWarehouse, flinkCatalogProperties.get("warehouse")); + } + + @Test + public void testToPaimonJdbcCatalog() { + String testUser = "testUser"; + String testPassword = "testPassword"; + String testUri = "testUri"; + Map catalogProperties = + ImmutableMap.of( + PaimonConfig.CATALOG_WAREHOUSE.getKey(), + localWarehouse, + PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, + "jdbc", + PaimonConfig.CATALOG_JDBC_USER.getKey(), + testUser, + PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(), + testPassword, + PaimonConfig.CATALOG_URI.getKey(), + testUri); + Map flinkCatalogProperties = + CONVERTER.toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals( + GravitinoPaimonCatalogFactoryOptions.IDENTIFIER, flinkCatalogProperties.get("type")); + Assertions.assertEquals( + localWarehouse, + flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.WAREHOUSE.key())); + Assertions.assertEquals( + testUser, flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key())); + Assertions.assertEquals( + testPassword, + flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key())); + Assertions.assertEquals( + "jdbc", + flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key())); + Assertions.assertEquals( + testUri, flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.URI.key())); + } + + @Test + public void testToGravitinoCatalogProperties() { + String testUser = "testUser"; + String testPassword = "testPassword"; + String testUri = "testUri"; + Configuration configuration = + Configuration.fromMap( + ImmutableMap.of( + GravitinoPaimonCatalogFactoryOptions.WAREHOUSE.key(), + localWarehouse, + GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key(), + "jdbc", + GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key(), + testUser, + GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key(), + testPassword, + GravitinoPaimonCatalogFactoryOptions.URI.key(), + testUri)); + Map properties = CONVERTER.toGravitinoCatalogProperties(configuration); + Assertions.assertEquals( + localWarehouse, properties.get(PaimonConfig.CATALOG_WAREHOUSE.getKey())); + Assertions.assertEquals(testUser, properties.get(PaimonConfig.CATALOG_JDBC_USER.getKey())); + Assertions.assertEquals( + testPassword, properties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey())); + Assertions.assertEquals(testUri, properties.get(PaimonConfig.CATALOG_URI.getKey())); + } +} From 06a72c90d33245f7b6f62532b264d19c35aa7461 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Sat, 21 Dec 2024 22:31:02 +0800 Subject: [PATCH 2/8] test --- .../integration/test/FlinkCommonIT.java | 102 +++++++++++++++--- .../test/paimon/FlinkPaimonCatalogIT.java | 86 +-------------- 2 files changed, 86 insertions(+), 102 deletions(-) diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 3c9958ed3b6..bef1f920fb8 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -71,6 +72,10 @@ protected boolean supportColumnOperation() { return true; } + protected boolean supportSchemaOperationWithCommentAndOptions() { + return true; + } + @Test @EnabledIf("supportSchemaOperation") public void testCreateSchema() { @@ -96,27 +101,14 @@ public void testGetSchema() { currentCatalog(), catalog -> { String schema = "test_get_schema"; - String comment = "test comment"; - String propertyKey = "key1"; - String propertyValue = "value1"; - String location = warehouse + "/" + schema; - try { TestUtils.assertTableResult( - sql( - "CREATE DATABASE IF NOT EXISTS %s COMMENT '%s' WITH ('%s'='%s', '%s'='%s')", - schema, comment, propertyKey, propertyValue, "location", location), - ResultKind.SUCCESS); + sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); catalog.asSchemas().schemaExists(schema); Schema loadedSchema = catalog.asSchemas().loadSchema(schema); Assertions.assertEquals(schema, loadedSchema.name()); - Assertions.assertEquals(comment, loadedSchema.comment()); - Assertions.assertEquals(2, loadedSchema.properties().size()); - Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); - Assertions.assertEquals( - location, loadedSchema.properties().get(HiveConstants.LOCATION)); } finally { catalog.asSchemas().dropSchema(schema, true); Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); @@ -130,7 +122,6 @@ public void testListSchema() { doWithCatalog( currentCatalog(), catalog -> { - Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); String schema = "test_list_schema"; String schema2 = "test_list_schema2"; String schema3 = "test_list_schema3"; @@ -151,6 +142,7 @@ public void testListSchema() { Row.of(schema3)); String[] schemas = catalog.asSchemas().listSchemas(); + Arrays.sort(schemas); Assertions.assertEquals(4, schemas.length); Assertions.assertEquals("default", schemas[0]); Assertions.assertEquals(schema, schemas[1]); @@ -166,8 +158,8 @@ public void testListSchema() { } @Test - @EnabledIf("supportSchemaOperation") - public void testAlterSchema() { + @EnabledIf("supportSchemaOperationWithCommentAndOptions") + public void testAlterSchemaWithCommentAndOptions() { doWithCatalog( currentCatalog(), catalog -> { @@ -204,6 +196,82 @@ public void testAlterSchema() { }); } + @Test + @EnabledIf("supportSchemaOperationWithCommentAndOptions") + public void testGetSchemaWithCommentAndOptions() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_get_schema"; + String comment = "test comment"; + String propertyKey = "key1"; + String propertyValue = "value1"; + String location = warehouse + "/" + schema; + + try { + TestUtils.assertTableResult( + sql( + "CREATE DATABASE IF NOT EXISTS %s COMMENT '%s' WITH ('%s'='%s', '%s'='%s')", + schema, comment, propertyKey, propertyValue, "location", location), + ResultKind.SUCCESS); + TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); + + catalog.asSchemas().schemaExists(schema); + Schema loadedSchema = catalog.asSchemas().loadSchema(schema); + Assertions.assertEquals(schema, loadedSchema.name()); + Assertions.assertEquals(comment, loadedSchema.comment()); + Assertions.assertEquals(2, loadedSchema.properties().size()); + Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); + Assertions.assertEquals( + location, loadedSchema.properties().get(HiveConstants.LOCATION)); + } finally { + catalog.asSchemas().dropSchema(schema, true); + Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); + } + }); + } + + @Test + @EnabledIf("supportSchemaOperationWithCommentAndOptions") + public void testListSchemaWithCommentAndOptions() { + doWithCatalog( + currentCatalog(), + catalog -> { + Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); + String schema = "test_list_schema"; + String schema2 = "test_list_schema2"; + String schema3 = "test_list_schema3"; + + try { + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema2), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("CREATE DATABASE IF NOT EXISTS %s", schema3), ResultKind.SUCCESS); + TestUtils.assertTableResult( + sql("SHOW DATABASES"), + ResultKind.SUCCESS_WITH_CONTENT, + Row.of("default"), + Row.of(schema), + Row.of(schema2), + Row.of(schema3)); + + String[] schemas = catalog.asSchemas().listSchemas(); + Assertions.assertEquals(4, schemas.length); + Assertions.assertEquals("default", schemas[0]); + Assertions.assertEquals(schema, schemas[1]); + Assertions.assertEquals(schema2, schemas[2]); + Assertions.assertEquals(schema3, schemas[3]); + } finally { + catalog.asSchemas().dropSchema(schema, true); + catalog.asSchemas().dropSchema(schema2, true); + catalog.asSchemas().dropSchema(schema3, true); + Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); + } + }); + } + @Test @EnabledIf("supportTableOperation") public void testCreateSimpleTable() { diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index f06746e98ca..ec1ca31d71f 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -20,16 +20,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; -import java.util.Arrays; import java.util.Map; -import org.apache.flink.table.api.ResultKind; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.types.Row; import org.apache.gravitino.Catalog; -import org.apache.gravitino.Schema; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; -import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -53,7 +47,7 @@ protected boolean supportTableOperation() { } @Override - protected boolean supportSchemaOperation() { + protected boolean supportSchemaOperationWithCommentAndOptions() { return false; } @@ -87,84 +81,6 @@ private static void initPaimonCatalog() { "/tmp/gravitino/paimon")); } - @Test - public void testCreateSchema() { - doWithCatalog( - currentCatalog(), - catalog -> { - String schema = "test_create_schema"; - try { - TableResult tableResult = sql("CREATE DATABASE IF NOT EXISTS %s", schema); - TestUtils.assertTableResult(tableResult, ResultKind.SUCCESS); - catalog.asSchemas().schemaExists(schema); - } finally { - catalog.asSchemas().dropSchema(schema, true); - Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); - } - }); - } - - @Test - public void testGetSchema() { - doWithCatalog( - currentCatalog(), - catalog -> { - String schema = "test_get_schema"; - try { - TestUtils.assertTableResult( - sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); - TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); - - catalog.asSchemas().schemaExists(schema); - Schema loadedSchema = catalog.asSchemas().loadSchema(schema); - Assertions.assertEquals(schema, loadedSchema.name()); - } finally { - catalog.asSchemas().dropSchema(schema, true); - Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); - } - }); - } - - @Test - public void testListSchema() { - doWithCatalog( - currentCatalog(), - catalog -> { - String schema = "test_list_schema"; - String schema2 = "test_list_schema2"; - String schema3 = "test_list_schema3"; - - try { - TestUtils.assertTableResult( - sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); - TestUtils.assertTableResult( - sql("CREATE DATABASE IF NOT EXISTS %s", schema2), ResultKind.SUCCESS); - TestUtils.assertTableResult( - sql("CREATE DATABASE IF NOT EXISTS %s", schema3), ResultKind.SUCCESS); - TestUtils.assertTableResult( - sql("SHOW DATABASES"), - ResultKind.SUCCESS_WITH_CONTENT, - Row.of("default"), - Row.of(schema), - Row.of(schema2), - Row.of(schema3)); - - String[] schemas = catalog.asSchemas().listSchemas(); - Arrays.sort(schemas); - Assertions.assertEquals(4, schemas.length); - Assertions.assertEquals("default", schemas[0]); - Assertions.assertEquals(schema, schemas[1]); - Assertions.assertEquals(schema2, schemas[2]); - Assertions.assertEquals(schema3, schemas[3]); - } finally { - catalog.asSchemas().dropSchema(schema, true); - catalog.asSchemas().dropSchema(schema2, true); - catalog.asSchemas().dropSchema(schema3, true); - Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); - } - }); - } - @Test public void testCreateGravitinoPaimonCatalogUsingSQL() { tableEnv.useCatalog(DEFAULT_CATALOG); From d9aed984f58e8d41827c0121ee9d17906e0fda3c Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 27 Dec 2024 14:27:41 +0800 Subject: [PATCH 3/8] fix npe --- .../flink/connector/paimon/GravitinoPaimonCatalog.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java index f403c54e69e..017ac6e7085 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -30,7 +30,7 @@ */ public class GravitinoPaimonCatalog extends BaseCatalog { - private AbstractCatalog paimonCatalog; + private final AbstractCatalog paimonCatalog; protected GravitinoPaimonCatalog( String catalogName, @@ -38,6 +38,7 @@ protected GravitinoPaimonCatalog( PropertiesConverter propertiesConverter, PartitionConverter partitionConverter) { super(catalogName, paimonCatalog.getDefaultDatabase(), propertiesConverter, partitionConverter); + this.paimonCatalog = paimonCatalog; } @Override From 48063a335ec8dfdf5054417442b4bdddb22c0571 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 10 Jan 2025 21:37:54 +0800 Subject: [PATCH 4/8] add bypass --- .../paimon/PaimonPropertiesUtils.java | 46 ++++++++---- flink-connector/flink/build.gradle.kts | 5 -- .../paimon/GravitinoPaimonCatalogFactory.java | 3 +- .../GravitinoPaimonCatalogFactoryOptions.java | 20 ----- .../paimon/PaimonPropertiesConverter.java | 75 ++++++++----------- .../store/GravitinoCatalogStore.java | 3 +- .../test/paimon/FlinkPaimonCatalogIT.java | 10 ++- .../paimon/TestPaimonPropertiesConverter.java | 37 +++++---- 8 files changed, 91 insertions(+), 108 deletions(-) diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java index 0dcf24f3a67..6c972b08390 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java @@ -32,25 +32,41 @@ public class PaimonPropertiesUtils { // will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will // change it to `catalogType` automatically and pass it to Paimon. public static final Map GRAVITINO_CONFIG_TO_PAIMON; + public static final Map PAIMON_CATALOG_CONFIG_TO_GRAVITINO; static { - Map map = new HashMap(); - map.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.CATALOG_BACKEND); - map.put(PaimonConstants.GRAVITINO_JDBC_DRIVER, PaimonConstants.GRAVITINO_JDBC_DRIVER); - map.put(PaimonConstants.GRAVITINO_JDBC_USER, PaimonConstants.PAIMON_JDBC_USER); - map.put(PaimonConstants.GRAVITINO_JDBC_PASSWORD, PaimonConstants.PAIMON_JDBC_PASSWORD); - map.put(PaimonConstants.URI, PaimonConstants.URI); - map.put(PaimonConstants.WAREHOUSE, PaimonConstants.WAREHOUSE); - map.put(PaimonConstants.CATALOG_BACKEND_NAME, PaimonConstants.CATALOG_BACKEND_NAME); + Map gravitinoConfigToPaimon = new HashMap<>(); + Map paimonCatalogConfigToGravitino = new HashMap<>(); + gravitinoConfigToPaimon.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.METASTORE); + gravitinoConfigToPaimon.put( + PaimonConstants.GRAVITINO_JDBC_DRIVER, PaimonConstants.GRAVITINO_JDBC_DRIVER); + gravitinoConfigToPaimon.put( + PaimonConstants.GRAVITINO_JDBC_USER, PaimonConstants.PAIMON_JDBC_USER); + gravitinoConfigToPaimon.put( + PaimonConstants.GRAVITINO_JDBC_PASSWORD, PaimonConstants.PAIMON_JDBC_PASSWORD); + gravitinoConfigToPaimon.put(PaimonConstants.URI, PaimonConstants.URI); + gravitinoConfigToPaimon.put(PaimonConstants.WAREHOUSE, PaimonConstants.WAREHOUSE); + gravitinoConfigToPaimon.put( + PaimonConstants.CATALOG_BACKEND_NAME, PaimonConstants.CATALOG_BACKEND_NAME); // S3 - map.put(S3Properties.GRAVITINO_S3_ENDPOINT, PaimonConstants.S3_ENDPOINT); - map.put(S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, PaimonConstants.S3_ACCESS_KEY); - map.put(S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, PaimonConstants.S3_SECRET_KEY); + gravitinoConfigToPaimon.put(S3Properties.GRAVITINO_S3_ENDPOINT, PaimonConstants.S3_ENDPOINT); + gravitinoConfigToPaimon.put( + S3Properties.GRAVITINO_S3_ACCESS_KEY_ID, PaimonConstants.S3_ACCESS_KEY); + gravitinoConfigToPaimon.put( + S3Properties.GRAVITINO_S3_SECRET_ACCESS_KEY, PaimonConstants.S3_SECRET_KEY); // OSS - map.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, PaimonConstants.OSS_ENDPOINT); - map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, PaimonConstants.OSS_ACCESS_KEY); - map.put(OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, PaimonConstants.OSS_SECRET_KEY); - GRAVITINO_CONFIG_TO_PAIMON = Collections.unmodifiableMap(map); + gravitinoConfigToPaimon.put(OSSProperties.GRAVITINO_OSS_ENDPOINT, PaimonConstants.OSS_ENDPOINT); + gravitinoConfigToPaimon.put( + OSSProperties.GRAVITINO_OSS_ACCESS_KEY_ID, PaimonConstants.OSS_ACCESS_KEY); + gravitinoConfigToPaimon.put( + OSSProperties.GRAVITINO_OSS_ACCESS_KEY_SECRET, PaimonConstants.OSS_SECRET_KEY); + GRAVITINO_CONFIG_TO_PAIMON = Collections.unmodifiableMap(gravitinoConfigToPaimon); + gravitinoConfigToPaimon.forEach( + (key, value) -> { + paimonCatalogConfigToGravitino.put(value, key); + }); + PAIMON_CATALOG_CONFIG_TO_GRAVITINO = + Collections.unmodifiableMap(paimonCatalogConfigToGravitino); } /** diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index fafaa760701..14329dd1c07 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -38,10 +38,6 @@ val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".") val scalaVersion: String = "2.12" val artifactName = "${rootProject.name}-flink-${flinkMajorVersion}_$scalaVersion" -tasks.compileJava { - dependsOn(":catalogs:catalog-lakehouse-paimon:runtimeJars") -} - dependencies { implementation(project(":core")) implementation(project(":catalogs:catalog-common")) @@ -49,7 +45,6 @@ dependencies { implementation(libs.guava) compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) - compileOnly(project(":catalogs:catalog-lakehouse-paimon")) compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java index a56c1a2854d..52489fc667f 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java @@ -19,7 +19,6 @@ package org.apache.gravitino.flink.connector.paimon; -import com.google.common.collect.ImmutableSet; import java.util.Collections; import java.util.Set; import org.apache.flink.configuration.ConfigOption; @@ -51,7 +50,7 @@ public String factoryIdentifier() { @Override public Set> requiredOptions() { - return ImmutableSet.of(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND); + return Collections.emptySet(); } @Override diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java index c01b89714aa..dd78f96d24b 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java @@ -19,28 +19,8 @@ package org.apache.gravitino.flink.connector.paimon; -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - public class GravitinoPaimonCatalogFactoryOptions { /** Identifier for the {@link GravitinoPaimonCatalog}. */ public static final String IDENTIFIER = "gravitino-paimon"; - - public static ConfigOption CATALOG_BACKEND = - ConfigOptions.key("catalog.backend") - .stringType() - .defaultValue("fileSystem") - .withDescription(""); - - public static ConfigOption WAREHOUSE = - ConfigOptions.key("warehouse").stringType().noDefaultValue(); - - public static ConfigOption URI = ConfigOptions.key("uri").stringType().noDefaultValue(); - - public static ConfigOption JDBC_USER = - ConfigOptions.key("jdbc.user").stringType().noDefaultValue(); - - public static ConfigOption JDBC_PASSWORD = - ConfigOptions.key("jdbc.password").stringType().noDefaultValue(); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java index 80285fb1619..36a9e0f9a1a 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -23,11 +23,10 @@ import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.CommonCatalogOptions; -import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogBackend; -import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; -import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; import org.apache.gravitino.flink.connector.PropertiesConverter; -import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.catalog.FileSystemCatalogFactory; public class PaimonPropertiesConverter implements PropertiesConverter { @@ -37,54 +36,42 @@ private PaimonPropertiesConverter() {} @Override public Map toGravitinoCatalogProperties(Configuration flinkConf) { - Map gravitinoCatalogProperties = flinkConf.toMap(); - String warehouse = flinkConf.get(GravitinoPaimonCatalogFactoryOptions.WAREHOUSE); - gravitinoCatalogProperties.put(PaimonConfig.CATALOG_WAREHOUSE.getKey(), warehouse); - String backendType = flinkConf.get(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND); - gravitinoCatalogProperties.put( - PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, backendType); - if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(backendType)) { - gravitinoCatalogProperties.put( - PaimonConfig.CATALOG_URI.getKey(), - flinkConf.get(GravitinoPaimonCatalogFactoryOptions.URI)); - gravitinoCatalogProperties.put( - PaimonConfig.CATALOG_JDBC_USER.getKey(), - flinkConf.get(GravitinoPaimonCatalogFactoryOptions.JDBC_USER)); - gravitinoCatalogProperties.put( - PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(), - flinkConf.get(GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD)); - } else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(backendType)) { - throw new UnsupportedOperationException( - "The Gravitino Connector does not currently support creating a Paimon Catalog that uses Hive Metastore."); + Map gravitinoProperties = Maps.newHashMap(); + Map flinkConfMap = flinkConf.toMap(); + for (Map.Entry entry : flinkConfMap.entrySet()) { + String gravitinoKey = + PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey()); + if (gravitinoKey != null) { + gravitinoProperties.put(gravitinoKey, entry.getValue()); + } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { + gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); + } else { + gravitinoProperties.put(entry.getKey(), entry.getValue()); + } } - return gravitinoCatalogProperties; + gravitinoProperties.put( + PaimonConstants.CATALOG_BACKEND, + flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER)); + return gravitinoProperties; } @Override public Map toFlinkCatalogProperties(Map gravitinoProperties) { Map flinkCatalogProperties = Maps.newHashMap(); - flinkCatalogProperties.putAll(gravitinoProperties); - String backendType = - flinkCatalogProperties.get(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND); - if (PaimonCatalogBackend.JDBC.name().equalsIgnoreCase(backendType)) { - flinkCatalogProperties.put(CatalogOptions.METASTORE.key(), backendType); - flinkCatalogProperties.put( - GravitinoPaimonCatalogFactoryOptions.URI.key(), - gravitinoProperties.get(PaimonConfig.CATALOG_URI.getKey())); - flinkCatalogProperties.put( - GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key(), - gravitinoProperties.get(PaimonConfig.CATALOG_JDBC_USER.getKey())); - flinkCatalogProperties.put( - GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key(), - gravitinoProperties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey())); - } else if (PaimonCatalogBackend.HIVE.name().equalsIgnoreCase(backendType)) { - throw new UnsupportedOperationException( - "The Gravitino Connector does not currently support creating a Paimon Catalog that uses Hive Metastore."); - } - flinkCatalogProperties.put( - GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key(), backendType); flinkCatalogProperties.put( CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); + gravitinoProperties.forEach( + (key, value) -> { + String flinkConfigKey = key; + if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { + flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); + } + flinkCatalogProperties.put( + PaimonPropertiesUtils.GRAVITINO_CONFIG_TO_PAIMON.getOrDefault( + flinkConfigKey, flinkConfigKey), + value); + }); + return flinkCatalogProperties; } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java index 92e778ce297..4c29b7fde3b 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java @@ -54,7 +54,8 @@ public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) { public void storeCatalog(String catalogName, CatalogDescriptor descriptor) throws CatalogException { Configuration configuration = descriptor.getConfiguration(); - BaseCatalogFactory catalogFactory = getCatalogFactory(configuration.toMap()); + Map gravitino = configuration.toMap(); + BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino); Map gravitinoProperties = catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration); gravitinoCatalogManager.createCatalog( diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index ec1ca31d71f..c10f843f753 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import java.nio.file.Path; import java.util.Map; import org.apache.gravitino.Catalog; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; @@ -27,10 +28,15 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +@Tag("gravitino-docker-test") public class FlinkPaimonCatalogIT extends FlinkCommonIT { + @TempDir private static Path warehouseDir; + private static final String DEFAULT_PAIMON_CATALOG = "test_flink_paimon_filesystem_schema_catalog"; @@ -78,7 +84,7 @@ private static void initPaimonCatalog() { PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, "filesystem", "warehouse", - "/tmp/gravitino/paimon")); + warehouseDir.toString())); } @Test @@ -86,7 +92,7 @@ public void testCreateGravitinoPaimonCatalogUsingSQL() { tableEnv.useCatalog(DEFAULT_CATALOG); int numCatalogs = tableEnv.listCatalogs().length; String catalogName = "gravitino_hive_sql"; - String warehouse = "/tmp/gravitino/paimon"; + String warehouse = warehouseDir.toString(); tableEnv.executeSql( String.format( "create catalog %s with (" diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java index 5b37c5a3a97..47673bc2f18 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -59,24 +60,20 @@ public void testToPaimonJdbcCatalog() { PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(), testPassword, PaimonConfig.CATALOG_URI.getKey(), - testUri); + testUri, + "flink.bypass.key", + "value"); Map flinkCatalogProperties = CONVERTER.toFlinkCatalogProperties(catalogProperties); Assertions.assertEquals( GravitinoPaimonCatalogFactoryOptions.IDENTIFIER, flinkCatalogProperties.get("type")); + Assertions.assertEquals(localWarehouse, flinkCatalogProperties.get(PaimonConstants.WAREHOUSE)); + Assertions.assertEquals(testUser, flinkCatalogProperties.get(PaimonConstants.PAIMON_JDBC_USER)); Assertions.assertEquals( - localWarehouse, - flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.WAREHOUSE.key())); - Assertions.assertEquals( - testUser, flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key())); - Assertions.assertEquals( - testPassword, - flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key())); - Assertions.assertEquals( - "jdbc", - flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key())); - Assertions.assertEquals( - testUri, flinkCatalogProperties.get(GravitinoPaimonCatalogFactoryOptions.URI.key())); + testPassword, flinkCatalogProperties.get(PaimonConstants.PAIMON_JDBC_PASSWORD)); + Assertions.assertEquals("jdbc", flinkCatalogProperties.get(PaimonConstants.METASTORE)); + Assertions.assertEquals(testUri, flinkCatalogProperties.get(PaimonConstants.URI)); + Assertions.assertEquals("value", flinkCatalogProperties.get("key")); } @Test @@ -84,18 +81,19 @@ public void testToGravitinoCatalogProperties() { String testUser = "testUser"; String testPassword = "testPassword"; String testUri = "testUri"; + String testBackend = "jdbc"; Configuration configuration = Configuration.fromMap( ImmutableMap.of( - GravitinoPaimonCatalogFactoryOptions.WAREHOUSE.key(), + PaimonConstants.WAREHOUSE, localWarehouse, - GravitinoPaimonCatalogFactoryOptions.CATALOG_BACKEND.key(), - "jdbc", - GravitinoPaimonCatalogFactoryOptions.JDBC_USER.key(), + PaimonConstants.METASTORE, + testBackend, + PaimonConstants.PAIMON_JDBC_USER, testUser, - GravitinoPaimonCatalogFactoryOptions.JDBC_PASSWORD.key(), + PaimonConstants.PAIMON_JDBC_PASSWORD, testPassword, - GravitinoPaimonCatalogFactoryOptions.URI.key(), + PaimonConstants.URI, testUri)); Map properties = CONVERTER.toGravitinoCatalogProperties(configuration); Assertions.assertEquals( @@ -104,5 +102,6 @@ public void testToGravitinoCatalogProperties() { Assertions.assertEquals( testPassword, properties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey())); Assertions.assertEquals(testUri, properties.get(PaimonConfig.CATALOG_URI.getKey())); + Assertions.assertEquals(testBackend, properties.get(PaimonConstants.CATALOG_BACKEND)); } } From f61a5356d67e2380badb872d998f086a8b1d10ee Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Fri, 10 Jan 2025 21:53:48 +0800 Subject: [PATCH 5/8] fix spark ci bug --- .../connector/paimon/PaimonPropertiesConverter.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java index f713ca89ddd..1544dfeec8c 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; -import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; import org.apache.gravitino.spark.connector.PropertiesConverter; @@ -42,14 +41,7 @@ public static PaimonPropertiesConverter getInstance() { @Override public Map toSparkCatalogProperties(Map properties) { Preconditions.checkArgument(properties != null, "Paimon Catalog properties should not be null"); - Map all = PaimonPropertiesUtils.toPaimonCatalogProperties(properties); - String catalogBackend = all.remove(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND); - Preconditions.checkArgument( - StringUtils.isNotBlank(catalogBackend), - String.format( - "%s should not be empty", PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND)); - all.put(PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE, catalogBackend); - return all; + return PaimonPropertiesUtils.toPaimonCatalogProperties(properties); } @Override From e419f0856af54b890acb247790d7d37c8926cb10 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Sun, 12 Jan 2025 17:25:31 +0800 Subject: [PATCH 6/8] remove paimon catalog dependency --- flink-connector/flink/build.gradle.kts | 1 - .../test/paimon/FlinkPaimonCatalogIT.java | 7 +++--- .../paimon/TestPaimonPropertiesConverter.java | 22 ++++++++----------- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index 14329dd1c07..3d2de633632 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -78,7 +78,6 @@ dependencies { testImplementation(project(":clients:client-java")) testImplementation(project(":core")) testImplementation(project(":common")) - testImplementation(project(":catalogs:catalog-lakehouse-paimon")) testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation(project(":server")) testImplementation(project(":server-common")) diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index c10f843f753..6b6457b2cc7 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -23,16 +23,15 @@ import java.nio.file.Path; import java.util.Map; import org.apache.gravitino.Catalog; -import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -@Tag("gravitino-docker-test") +// @Tag("gravitino-docker-test") public class FlinkPaimonCatalogIT extends FlinkCommonIT { @TempDir private static Path warehouseDir; @@ -81,7 +80,7 @@ private static void initPaimonCatalog() { "lakehouse-paimon", null, ImmutableMap.of( - PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, + PaimonConstants.CATALOG_BACKEND, "filesystem", "warehouse", warehouseDir.toString())); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java index 47673bc2f18..ae2adce8fbb 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java @@ -21,8 +21,6 @@ import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.flink.configuration.Configuration; -import org.apache.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; -import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConfig; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -51,15 +49,15 @@ public void testToPaimonJdbcCatalog() { String testUri = "testUri"; Map catalogProperties = ImmutableMap.of( - PaimonConfig.CATALOG_WAREHOUSE.getKey(), + PaimonConstants.WAREHOUSE, localWarehouse, - PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, + PaimonConstants.CATALOG_BACKEND, "jdbc", - PaimonConfig.CATALOG_JDBC_USER.getKey(), + PaimonConstants.GRAVITINO_JDBC_USER, testUser, - PaimonConfig.CATALOG_JDBC_PASSWORD.getKey(), + PaimonConstants.GRAVITINO_JDBC_PASSWORD, testPassword, - PaimonConfig.CATALOG_URI.getKey(), + PaimonConstants.URI, testUri, "flink.bypass.key", "value"); @@ -96,12 +94,10 @@ public void testToGravitinoCatalogProperties() { PaimonConstants.URI, testUri)); Map properties = CONVERTER.toGravitinoCatalogProperties(configuration); - Assertions.assertEquals( - localWarehouse, properties.get(PaimonConfig.CATALOG_WAREHOUSE.getKey())); - Assertions.assertEquals(testUser, properties.get(PaimonConfig.CATALOG_JDBC_USER.getKey())); - Assertions.assertEquals( - testPassword, properties.get(PaimonConfig.CATALOG_JDBC_PASSWORD.getKey())); - Assertions.assertEquals(testUri, properties.get(PaimonConfig.CATALOG_URI.getKey())); + Assertions.assertEquals(localWarehouse, properties.get(PaimonConstants.WAREHOUSE)); + Assertions.assertEquals(testUser, properties.get(PaimonConstants.GRAVITINO_JDBC_USER)); + Assertions.assertEquals(testPassword, properties.get(PaimonConstants.GRAVITINO_JDBC_PASSWORD)); + Assertions.assertEquals(testUri, properties.get(PaimonConstants.URI)); Assertions.assertEquals(testBackend, properties.get(PaimonConstants.CATALOG_BACKEND)); } } From 8ba1b944c2f770adf1cfcf1b736394edd2db7aff Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Mon, 13 Jan 2025 16:32:40 +0800 Subject: [PATCH 7/8] Optimized code --- flink-connector/flink/build.gradle.kts | 2 - .../paimon/PaimonPropertiesConverter.java | 17 ++- .../integration/test/FlinkCommonIT.java | 127 ++++++------------ .../test/paimon/FlinkPaimonCatalogIT.java | 3 +- .../paimon/TestPaimonPropertiesConverter.java | 8 +- 5 files changed, 56 insertions(+), 101 deletions(-) diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index 3d2de633632..f137a3eae1b 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -41,7 +41,6 @@ val artifactName = "${rootProject.name}-flink-${flinkMajorVersion}_$scalaVersion dependencies { implementation(project(":core")) implementation(project(":catalogs:catalog-common")) - implementation(libs.guava) compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) @@ -174,7 +173,6 @@ tasks.test { } else { dependsOn(tasks.jar) dependsOn(":catalogs:catalog-hive:jar") - dependsOn(":catalogs:catalog-lakehouse-paimon:jar") } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java index 36a9e0f9a1a..eb0dabb544f 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -20,6 +20,7 @@ package org.apache.gravitino.flink.connector.paimon; import com.google.common.collect.Maps; +import java.util.HashMap; import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.CommonCatalogOptions; @@ -57,21 +58,19 @@ public Map toGravitinoCatalogProperties(Configuration flinkConf) @Override public Map toFlinkCatalogProperties(Map gravitinoProperties) { - Map flinkCatalogProperties = Maps.newHashMap(); - flinkCatalogProperties.put( - CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); + Map all = new HashMap<>(); gravitinoProperties.forEach( (key, value) -> { String flinkConfigKey = key; if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); } - flinkCatalogProperties.put( - PaimonPropertiesUtils.GRAVITINO_CONFIG_TO_PAIMON.getOrDefault( - flinkConfigKey, flinkConfigKey), - value); + all.put(flinkConfigKey, value); }); - - return flinkCatalogProperties; + Map paimonCatalogProperties = + PaimonPropertiesUtils.toPaimonCatalogProperties(all); + paimonCatalogProperties.put( + CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); + return paimonCatalogProperties; } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index bef1f920fb8..674554b5667 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -60,10 +60,6 @@ public abstract class FlinkCommonIT extends FlinkEnvIT { protected abstract Catalog currentCatalog(); - protected boolean supportSchemaOperation() { - return true; - } - protected boolean supportTableOperation() { return true; } @@ -76,8 +72,12 @@ protected boolean supportSchemaOperationWithCommentAndOptions() { return true; } + protected boolean supportSchemaOperationWithoutCommentAndOption() { + return true; + } + @Test - @EnabledIf("supportSchemaOperation") + @EnabledIf("supportSchemaOperationWithoutCommentAndOption") public void testCreateSchema() { doWithCatalog( currentCatalog(), @@ -95,8 +95,8 @@ public void testCreateSchema() { } @Test - @EnabledIf("supportSchemaOperation") - public void testGetSchema() { + @EnabledIf("supportSchemaOperationWithoutCommentAndOption") + public void testGetSchemaWithoutCommentAndOption() { doWithCatalog( currentCatalog(), catalog -> { @@ -117,7 +117,42 @@ public void testGetSchema() { } @Test - @EnabledIf("supportSchemaOperation") + @EnabledIf("supportSchemaOperationWithCommentAndOptions") + public void testGetSchemaWithCommentAndOptions() { + doWithCatalog( + currentCatalog(), + catalog -> { + String schema = "test_get_schema"; + String comment = "test comment"; + String propertyKey = "key1"; + String propertyValue = "value1"; + String location = warehouse + "/" + schema; + + try { + TestUtils.assertTableResult( + sql( + "CREATE DATABASE IF NOT EXISTS %s COMMENT '%s' WITH ('%s'='%s', '%s'='%s')", + schema, comment, propertyKey, propertyValue, "location", location), + ResultKind.SUCCESS); + TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); + + catalog.asSchemas().schemaExists(schema); + Schema loadedSchema = catalog.asSchemas().loadSchema(schema); + Assertions.assertEquals(schema, loadedSchema.name()); + Assertions.assertEquals(comment, loadedSchema.comment()); + Assertions.assertEquals(2, loadedSchema.properties().size()); + Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); + Assertions.assertEquals( + location, loadedSchema.properties().get(HiveConstants.LOCATION)); + } finally { + catalog.asSchemas().dropSchema(schema, true); + Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); + } + }); + } + + @Test + @EnabledIf("supportSchemaOperationWithoutCommentAndOption") public void testListSchema() { doWithCatalog( currentCatalog(), @@ -196,82 +231,6 @@ public void testAlterSchemaWithCommentAndOptions() { }); } - @Test - @EnabledIf("supportSchemaOperationWithCommentAndOptions") - public void testGetSchemaWithCommentAndOptions() { - doWithCatalog( - currentCatalog(), - catalog -> { - String schema = "test_get_schema"; - String comment = "test comment"; - String propertyKey = "key1"; - String propertyValue = "value1"; - String location = warehouse + "/" + schema; - - try { - TestUtils.assertTableResult( - sql( - "CREATE DATABASE IF NOT EXISTS %s COMMENT '%s' WITH ('%s'='%s', '%s'='%s')", - schema, comment, propertyKey, propertyValue, "location", location), - ResultKind.SUCCESS); - TestUtils.assertTableResult(tableEnv.executeSql("USE " + schema), ResultKind.SUCCESS); - - catalog.asSchemas().schemaExists(schema); - Schema loadedSchema = catalog.asSchemas().loadSchema(schema); - Assertions.assertEquals(schema, loadedSchema.name()); - Assertions.assertEquals(comment, loadedSchema.comment()); - Assertions.assertEquals(2, loadedSchema.properties().size()); - Assertions.assertEquals(propertyValue, loadedSchema.properties().get(propertyKey)); - Assertions.assertEquals( - location, loadedSchema.properties().get(HiveConstants.LOCATION)); - } finally { - catalog.asSchemas().dropSchema(schema, true); - Assertions.assertFalse(catalog.asSchemas().schemaExists(schema)); - } - }); - } - - @Test - @EnabledIf("supportSchemaOperationWithCommentAndOptions") - public void testListSchemaWithCommentAndOptions() { - doWithCatalog( - currentCatalog(), - catalog -> { - Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); - String schema = "test_list_schema"; - String schema2 = "test_list_schema2"; - String schema3 = "test_list_schema3"; - - try { - TestUtils.assertTableResult( - sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS); - TestUtils.assertTableResult( - sql("CREATE DATABASE IF NOT EXISTS %s", schema2), ResultKind.SUCCESS); - TestUtils.assertTableResult( - sql("CREATE DATABASE IF NOT EXISTS %s", schema3), ResultKind.SUCCESS); - TestUtils.assertTableResult( - sql("SHOW DATABASES"), - ResultKind.SUCCESS_WITH_CONTENT, - Row.of("default"), - Row.of(schema), - Row.of(schema2), - Row.of(schema3)); - - String[] schemas = catalog.asSchemas().listSchemas(); - Assertions.assertEquals(4, schemas.length); - Assertions.assertEquals("default", schemas[0]); - Assertions.assertEquals(schema, schemas[1]); - Assertions.assertEquals(schema2, schemas[2]); - Assertions.assertEquals(schema3, schemas[3]); - } finally { - catalog.asSchemas().dropSchema(schema, true); - catalog.asSchemas().dropSchema(schema2, true); - catalog.asSchemas().dropSchema(schema3, true); - Assertions.assertEquals(1, catalog.asSchemas().listSchemas().length); - } - }); - } - @Test @EnabledIf("supportTableOperation") public void testCreateSimpleTable() { diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 6b6457b2cc7..10fab3567a3 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -28,10 +28,11 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -// @Tag("gravitino-docker-test") +@Tag("gravitino-docker-test") public class FlinkPaimonCatalogIT extends FlinkCommonIT { @TempDir private static Path warehouseDir; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java index ae2adce8fbb..4496d94c0a4 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestPaimonPropertiesConverter.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; +import org.apache.gravitino.flink.connector.PropertiesConverter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -57,10 +58,8 @@ public void testToPaimonJdbcCatalog() { testUser, PaimonConstants.GRAVITINO_JDBC_PASSWORD, testPassword, - PaimonConstants.URI, - testUri, - "flink.bypass.key", - "value"); + PropertiesConverter.FLINK_PROPERTY_PREFIX + PaimonConstants.URI, + testUri); Map flinkCatalogProperties = CONVERTER.toFlinkCatalogProperties(catalogProperties); Assertions.assertEquals( @@ -71,7 +70,6 @@ public void testToPaimonJdbcCatalog() { testPassword, flinkCatalogProperties.get(PaimonConstants.PAIMON_JDBC_PASSWORD)); Assertions.assertEquals("jdbc", flinkCatalogProperties.get(PaimonConstants.METASTORE)); Assertions.assertEquals(testUri, flinkCatalogProperties.get(PaimonConstants.URI)); - Assertions.assertEquals("value", flinkCatalogProperties.get("key")); } @Test From 73fffc85078f3f8a97bbca40d0ec8837b3207634 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Mon, 13 Jan 2025 21:52:15 +0800 Subject: [PATCH 8/8] Optimized code --- .../lakehouse/paimon/PaimonPropertiesUtils.java | 2 +- .../connector/paimon/PaimonPropertiesConverter.java | 4 ++++ .../connector/integration/test/FlinkCommonIT.java | 7 ------- .../connector/paimon/PaimonPropertiesConverter.java | 10 +++++++++- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java index 6c972b08390..7b1832fe56d 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/paimon/PaimonPropertiesUtils.java @@ -37,7 +37,7 @@ public class PaimonPropertiesUtils { static { Map gravitinoConfigToPaimon = new HashMap<>(); Map paimonCatalogConfigToGravitino = new HashMap<>(); - gravitinoConfigToPaimon.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.METASTORE); + gravitinoConfigToPaimon.put(PaimonConstants.CATALOG_BACKEND, PaimonConstants.CATALOG_BACKEND); gravitinoConfigToPaimon.put( PaimonConstants.GRAVITINO_JDBC_DRIVER, PaimonConstants.GRAVITINO_JDBC_DRIVER); gravitinoConfigToPaimon.put( diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java index eb0dabb544f..58613bee37d 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -69,6 +69,10 @@ public Map toFlinkCatalogProperties(Map gravitin }); Map paimonCatalogProperties = PaimonPropertiesUtils.toPaimonCatalogProperties(all); + paimonCatalogProperties.put( + PaimonConstants.METASTORE, + paimonCatalogProperties.getOrDefault( + PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER)); paimonCatalogProperties.put( CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); return paimonCatalogProperties; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 674554b5667..5a363e4e51b 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -72,12 +72,7 @@ protected boolean supportSchemaOperationWithCommentAndOptions() { return true; } - protected boolean supportSchemaOperationWithoutCommentAndOption() { - return true; - } - @Test - @EnabledIf("supportSchemaOperationWithoutCommentAndOption") public void testCreateSchema() { doWithCatalog( currentCatalog(), @@ -95,7 +90,6 @@ public void testCreateSchema() { } @Test - @EnabledIf("supportSchemaOperationWithoutCommentAndOption") public void testGetSchemaWithoutCommentAndOption() { doWithCatalog( currentCatalog(), @@ -152,7 +146,6 @@ public void testGetSchemaWithCommentAndOptions() { } @Test - @EnabledIf("supportSchemaOperationWithoutCommentAndOption") public void testListSchema() { doWithCatalog( currentCatalog(), diff --git a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java index 1544dfeec8c..f713ca89ddd 100644 --- a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java +++ b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/paimon/PaimonPropertiesConverter.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; import org.apache.gravitino.spark.connector.PropertiesConverter; @@ -41,7 +42,14 @@ public static PaimonPropertiesConverter getInstance() { @Override public Map toSparkCatalogProperties(Map properties) { Preconditions.checkArgument(properties != null, "Paimon Catalog properties should not be null"); - return PaimonPropertiesUtils.toPaimonCatalogProperties(properties); + Map all = PaimonPropertiesUtils.toPaimonCatalogProperties(properties); + String catalogBackend = all.remove(PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogBackend), + String.format( + "%s should not be empty", PaimonPropertiesConstants.GRAVITINO_PAIMON_CATALOG_BACKEND)); + all.put(PaimonPropertiesConstants.PAIMON_CATALOG_METASTORE, catalogBackend); + return all; } @Override