Skip to content

Commit

Permalink
paimon table ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
hdygxsj committed Jan 15, 2025
1 parent e4151e9 commit 0b1ace7
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -656,11 +656,11 @@ static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase u
return schemaChanges.toArray(new SchemaChange[0]);
}

private Catalog catalog() {
protected Catalog catalog() {
return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName());
}

private String catalogName() {
protected String catalogName() {
return getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@

package org.apache.gravitino.flink.connector.paimon;

import java.util.Optional;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
import org.apache.paimon.flink.FlinkTableFactory;

/**
* The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to
Expand All @@ -45,4 +52,21 @@ protected GravitinoPaimonCatalog(
protected AbstractCatalog realCatalog() {
return paimonCatalog;
}

@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
boolean dropped =
catalog()
.asTableCatalog()
.purgeTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
if (!dropped && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName(), tablePath);
}
}

@Override
public Optional<Factory> getFactory() {
return Optional.of(new FlinkTableFactory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.gravitino.flink.connector.integration.test;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.EnvironmentSettings;
Expand Down Expand Up @@ -159,17 +159,18 @@ protected TableResult sql(@FormatString String sql, Object... args) {
return tableEnv.executeSql(String.format(sql, args));
}

protected static void doWithSchema(
protected Map<String, String> schemaOptions(String schemaName) {
return null;
}

protected void doWithSchema(
Catalog catalog, String schemaName, Consumer<Catalog> action, boolean dropSchema) {
Preconditions.checkNotNull(catalog);
Preconditions.checkNotNull(schemaName);
try {
tableEnv.useCatalog(catalog.name());
if (!catalog.asSchemas().schemaExists(schemaName)) {
catalog
.asSchemas()
.createSchema(
schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName));
catalog.asSchemas().createSchema(schemaName, null, schemaOptions(schemaName));
}
tableEnv.useDatabase(schemaName);
action.accept(catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,4 +586,9 @@ public void testGetHiveTable() {
protected org.apache.gravitino.Catalog currentCatalog() {
return hiveCatalog;
}

@Override
protected Map<String, String> schemaOptions(String schemaName) {
return ImmutableMap.of("location", warehouse + "/" + schemaName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT {

private static org.apache.gravitino.Catalog catalog;

@Override
protected boolean supportColumnOperation() {
return false;
}

@Override
protected boolean supportTableOperation() {
return false;
}

@Override
protected boolean supportSchemaOperationWithCommentAndOptions() {
return false;
Expand Down

0 comments on commit 0b1ace7

Please sign in to comment.