Skip to content

Commit

Permalink
[#5194] feat(flink): Support basic table DDL Operation for paimon-cat…
Browse files Browse the repository at this point in the history
…alog (#6224)

### What changes were proposed in this pull request?

Support basic table DDL Operation for paimon-catalog

### Why are the changes needed?

Fix: #5194

### Does this PR introduce _any_ user-facing change?

None.

### How was this patch tested?


org.apache.gravitino.flink.connector.integration.test.paimon.FlinkPaimonCatalogIT
  • Loading branch information
hdygxsj authored and web-flow committed Jan 15, 2025
1 parent 59d4499 commit 3c73ac0
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -656,11 +656,11 @@ static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase u
return schemaChanges.toArray(new SchemaChange[0]);
}

private Catalog catalog() {
protected Catalog catalog() {
return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName());
}

private String catalogName() {
protected String catalogName() {
return getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@

package org.apache.gravitino.flink.connector.paimon;

import java.util.Optional;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.paimon.flink.FlinkTableFactory;

/**
* The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to
Expand All @@ -45,4 +52,21 @@ protected GravitinoPaimonCatalog(
protected AbstractCatalog realCatalog() {
return paimonCatalog;
}

@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
boolean dropped =
catalog()
.asTableCatalog()
.purgeTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
if (!dropped && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath);
}
}

@Override
public Optional<Factory> getFactory() {
return Optional.of(new FlinkTableFactory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.gravitino.flink.connector.integration.test;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
Expand Down Expand Up @@ -159,17 +158,14 @@ protected TableResult sql(@FormatString String sql, Object... args) {
return tableEnv.executeSql(String.format(sql, args));
}

protected static void doWithSchema(
protected void doWithSchema(
Catalog catalog, String schemaName, Consumer<Catalog> action, boolean dropSchema) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(schemaName);
try {
tableEnv.useCatalog(catalog.name());
if (!catalog.asSchemas().schemaExists(schemaName)) {
catalog
.asSchemas()
.createSchema(
schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName));
catalog.asSchemas().createSchema(schemaName, null, null);
}
tableEnv.useDatabase(schemaName);
action.accept(catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -586,4 +587,28 @@ public void testGetHiveTable() {
protected org.apache.gravitino.Catalog currentCatalog() {
return hiveCatalog;
}

protected void doWithSchema(
org.apache.gravitino.Catalog catalog,
String schemaName,
Consumer<org.apache.gravitino.Catalog> action,
boolean dropSchema) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(schemaName);
try {
tableEnv.useCatalog(catalog.name());
if (!catalog.asSchemas().schemaExists(schemaName)) {
catalog
.asSchemas()
.createSchema(
schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName));
}
tableEnv.useDatabase(schemaName);
action.accept(catalog);
} finally {
if (dropSchema) {
catalog.asSchemas().dropSchema(schemaName, true);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog catalog;

@Override
protected boolean supportColumnOperation() {
return false;
}

@Override
protected boolean supportTableOperation() {
return false;
}

@Override
protected boolean supportSchemaOperationWithCommentAndOptions() {
return false;
Expand Down

0 comments on commit 3c73ac0

Please sign in to comment.