From 3c73ac07c84b1a10b21659f54a598db53c2898ec Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Wed, 15 Jan 2025 18:20:25 +0800 Subject: [PATCH] [#5194] feat(flink): Support basic table DDL Operation for paimon-catalog (#6224) ### What changes were proposed in this pull request? Support basic table DDL Operation for paimon-catalog ### Why are the changes needed? Fix: #5194 ### Does this PR introduce _any_ user-facing change? None. ### How was this patch tested? org.apache.gravitino.flink.connector.integration.test.paimon.FlinkPaimonCatalogIT --- .../flink/connector/catalog/BaseCatalog.java | 4 +-- .../paimon/GravitinoPaimonCatalog.java | 24 ++++++++++++++++++ .../integration/test/FlinkEnvIT.java | 8 ++---- .../test/hive/FlinkHiveCatalogIT.java | 25 +++++++++++++++++++ .../test/paimon/FlinkPaimonCatalogIT.java | 10 -------- 5 files changed, 53 insertions(+), 18 deletions(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 1496742177f..fd8e118ee49 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -656,11 +656,11 @@ static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase u return schemaChanges.toArray(new SchemaChange[0]); } - private Catalog catalog() { + protected Catalog catalog() { return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName()); } - private String catalogName() { + protected String catalogName() { return getName(); } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java index 017ac6e7085..c22e00fa122 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -19,10 +19,17 @@ package org.apache.gravitino.flink.connector.paimon; +import java.util.Optional; import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.factories.Factory; +import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.gravitino.flink.connector.catalog.BaseCatalog; +import org.apache.paimon.flink.FlinkTableFactory; /** * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to @@ -45,4 +52,21 @@ protected GravitinoPaimonCatalog( protected AbstractCatalog realCatalog() { return paimonCatalog; } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + boolean dropped = + catalog() + .asTableCatalog() + .purgeTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); + if (!dropped && !ignoreIfNotExists) { + throw new TableNotExistException(catalogName(), tablePath); + } + } + + @Override + public Optional getFactory() { + return Optional.of(new FlinkTableFactory()); + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 5ae8847c6c1..f56b5297e17 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -19,7 +19,6 @@ package org.apache.gravitino.flink.connector.integration.test; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; @@ -159,17 +158,14 @@ protected TableResult sql(@FormatString String sql, Object... args) { return tableEnv.executeSql(String.format(sql, args)); } - protected static void doWithSchema( + protected void doWithSchema( Catalog catalog, String schemaName, Consumer action, boolean dropSchema) { Preconditions.checkNotNull(catalog); Preconditions.checkNotNull(schemaName); try { tableEnv.useCatalog(catalog.name()); if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog - .asSchemas() - .createSchema( - schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); + catalog.asSchemas().createSchema(schemaName, null, null); } tableEnv.useDatabase(schemaName); action.accept(catalog); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 333aa83f0b6..bb7b25f6b20 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; @@ -586,4 +587,28 @@ public void testGetHiveTable() { protected org.apache.gravitino.Catalog currentCatalog() { return hiveCatalog; } + + protected void doWithSchema( + org.apache.gravitino.Catalog catalog, + String schemaName, + Consumer action, + boolean dropSchema) { + Preconditions.checkNotNull(catalog); + Preconditions.checkNotNull(schemaName); + try { + tableEnv.useCatalog(catalog.name()); + if (!catalog.asSchemas().schemaExists(schemaName)) { + catalog + .asSchemas() + .createSchema( + schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); + } + tableEnv.useDatabase(schemaName); + action.accept(catalog); + } finally { + if (dropSchema) { + catalog.asSchemas().dropSchema(schemaName, true); + } + } + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 10fab3567a3..57a17c2a114 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog catalog; - @Override - protected boolean supportColumnOperation() { - return false; - } - - @Override - protected boolean supportTableOperation() { - return false; - } - @Override protected boolean supportSchemaOperationWithCommentAndOptions() { return false;