Skip to content

Commit

Permalink
[apache#3526] feat(trino-connector): Support create Gravitino catalog…
Browse files Browse the repository at this point in the history
… by using Trino CREATE CATALOG command (apache#3540)

1. Upgrade code to Trino-435
2. Support create Gravitino catalog by CREATE CATALOG statement
3. Disable testers of  Trino Connector

Fix: apache#3526

NO

Exists UT
  • Loading branch information
diqiu50 committed Jun 13, 2024
1 parent 9f92b10 commit 7470778
Show file tree
Hide file tree
Showing 41 changed files with 432 additions and 377 deletions.
11 changes: 1 addition & 10 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,6 @@ nexusPublishing {
packageGroup.set("com.datastrato.gravitino")
}

dependencies {
testImplementation(libs.testng)
}

subprojects {
// Gravitino Python client project didn't need to apply the java plugin
if (project.name == "client-python") {
Expand Down Expand Up @@ -411,12 +407,7 @@ subprojects {
reports.html.outputLocation.set(file("${rootProject.projectDir}/build/reports/"))
val skipTests = project.hasProperty("skipTests")
if (!skipTests) {
if (project.name == "trino-connector") {
useTestNG()
maxHeapSize = "2G"
} else {
useJUnitPlatform()
}
useJUnitPlatform()

jvmArgs(project.property("extraJvmArgs") as List<*>)
finalizedBy(tasks.getByName("jacocoTestReport"))
Expand Down
13 changes: 6 additions & 7 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ log4j = "2.22.0"
jetty = "9.4.51.v20230217"
jersey = "2.41"
mockito = "4.11.0"
airlift-units = "1.8"
airlift-json = "237"
airlift-log = "231"
airlift-units = "1.8"
hive2 = "2.3.9"
hadoop2 = "2.10.2"
hadoop3 = "3.1.0"
Expand All @@ -29,19 +30,17 @@ caffeine = "2.9.3"
rocksdbjni = "7.10.2"
iceberg = '1.3.1' # used for Gravitino Iceberg catalog and Iceberg REST service
iceberg4spark = "1.4.1" # used for compile spark connector
trino = '426'
spark33 = "3.3.4"
spark33 = "3.3.4"
spark34 = "3.4.3"
spark35 = "3.5.1"
kyuubi4spark33 = "1.7.4"
kyuubi4spark34 = "1.8.2"
kyuubi4spark35 = "1.9.0"
trino = '435'
scala-collection-compat = "2.7.0"
scala-java-compat = "1.0.2"
sqlite-jdbc = "3.42.0.0"
testng = "7.5.1"
testcontainers = "1.19.0"
trino-jdbc = "426"
jwt = "0.11.1"
jline = "3.21.0"
okhttp3 = "4.11.0"
Expand Down Expand Up @@ -118,6 +117,7 @@ hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common", version.
hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client", version.ref = "hadoop3"}
hadoop3-mapreduce-client-core = { group = "org.apache.hadoop", name = "hadoop-mapreduce-client-core", version.ref = "hadoop3"}
hadoop3-minicluster = { group = "org.apache.hadoop", name = "hadoop-minicluster", version.ref = "hadoop-minikdc"}
airlift-json = { group = "io.airlift", name = "json", version.ref = "airlift-json"}
airlift-units = { group = "io.airlift", name = "units", version.ref = "airlift-units"}
airlift-log = { group = "io.airlift", name = "log", version.ref = "airlift-log"}
httpclient5 = { group = "org.apache.httpcomponents.client5", name = "httpclient5", version.ref = "httpclient5" }
Expand All @@ -138,13 +138,12 @@ trino-memory= { group = "io.trino", name = "trino-memory", version.ref = "trino"
trino-cli= { group = "io.trino", name = "trino-cli", version.ref = "trino" }
trino-client= { group = "io.trino", name = "trino-client", version.ref = "trino" }
sqlite-jdbc = { group = "org.xerial", name = "sqlite-jdbc", version.ref = "sqlite-jdbc" }
testng = { group = "org.testng", name = "testng", version.ref = "testng" }
commons-dbcp2 = { group = "org.apache.commons", name = "commons-dbcp2", version.ref = "commons-dbcp2" }
testcontainers = { group = "org.testcontainers", name = "testcontainers", version.ref = "testcontainers" }
testcontainers-mysql = { group = "org.testcontainers", name = "mysql", version.ref = "testcontainers" }
testcontainers-postgresql = { group = "org.testcontainers", name = "postgresql", version.ref = "testcontainers" }
testcontainers-junit-jupiter = { group = "org.testcontainers", name = "junit-jupiter", version.ref = "testcontainers" }
trino-jdbc = { group = "io.trino", name = "trino-jdbc", version.ref = "trino-jdbc" }
trino-jdbc = { group = "io.trino", name = "trino-jdbc", version.ref = "trino" }
jwt-api = { group = "io.jsonwebtoken", name = "jjwt-api", version.ref = "jwt"}
jwt-impl = { group = "io.jsonwebtoken", name = "jjwt-impl", version.ref = "jwt"}
jwt-gson = { group = "io.jsonwebtoken", name = "jjwt-gson", version.ref = "jwt"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled
@Deprecated
@Tag("gravitino-docker-it")
public class TrinoConnectorIT extends AbstractIT {
public static final Logger LOG = LoggerFactory.getLogger(TrinoConnectorIT.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled
@Tag("gravitino-docker-it")
public class TrinoQueryIT extends TrinoQueryITBase {
private static final Logger LOG = LoggerFactory.getLogger(TrinoQueryIT.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.openqa.selenium.By;

@Disabled
@Tag("gravitino-docker-it")
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class CatalogsPageTest extends AbstractWebIT {
Expand Down
8 changes: 4 additions & 4 deletions trino-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ repositories {
dependencies {
implementation(project(":catalogs:bundled-catalog", configuration = "shadow"))
implementation(project(":clients:client-java-runtime", configuration = "shadow"))
implementation(libs.commons.collections4)
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.httpclient5)
implementation(libs.jackson.annotations)
implementation(libs.jackson.databind)
implementation(libs.commons.collections4)
implementation(libs.trino.jdbc)
compileOnly(libs.airlift.json)
compileOnly(libs.trino.spi) {
exclude("org.apache.logging.log4j")
}
Expand All @@ -34,6 +33,7 @@ dependencies {
testImplementation(libs.trino.testing) {
exclude("org.apache.logging.log4j")
}
testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks.named("generateMetadataFileForMavenJavaPublication") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

public class GravitinoConfig {

public static String GRAVITINO_DYNAMIC_CONNECTOR = "__gravitino.dynamic.connector";
public static final String GRAVITINO_DYNAMIC_CONNECTOR = "__gravitino.dynamic.connector";
public static final String GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG =
"__gravitino.dynamic.connector.catalog.config";
private static final Map<String, ConfigEntry> CONFIG_DEFINITIONS = new HashMap<>();

private final Map<String, String> config;
Expand All @@ -33,17 +35,18 @@ public class GravitinoConfig {

public GravitinoConfig(Map<String, String> requiredConfig) {
config = requiredConfig;

if (!isDynamicConnector()) {
for (Map.Entry<String, ConfigEntry> entry : CONFIG_DEFINITIONS.entrySet()) {
ConfigEntry configDefinition = entry.getValue();
if (configDefinition.isRequired && !config.containsKey(configDefinition.key)) {
String message =
String.format("Missing gravitino config, %s is required", configDefinition.key);
throw new TrinoException(GRAVITINO_MISSING_CONFIG, message);
}
for (Map.Entry<String, ConfigEntry> entry : CONFIG_DEFINITIONS.entrySet()) {
ConfigEntry configDefinition = entry.getValue();
if (configDefinition.isRequired && !config.containsKey(configDefinition.key)) {
String message =
String.format("Missing gravitino config, %s is required", configDefinition.key);
throw new TrinoException(GRAVITINO_MISSING_CONFIG, message);
}
}
if (isDynamicConnector() && !config.containsKey(GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG)) {
throw new TrinoException(
GRAVITINO_MISSING_CONFIG, "Incomplete Dynamic catalog connector config");
}
}

public String getURI() {
Expand Down Expand Up @@ -71,6 +74,10 @@ boolean isDynamicConnector() {
return config.getOrDefault(GRAVITINO_DYNAMIC_CONNECTOR, "false").equals("true");
}

public String getCatalogConfig() {
return config.get(GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG);
}

static class ConfigEntry {
final String key;
final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG;

import com.datastrato.gravitino.client.GravitinoAdminClient;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorContext;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorFactory;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorManager;
import com.datastrato.gravitino.trino.connector.catalog.CatalogInjector;
Expand All @@ -32,6 +31,9 @@ public class GravitinoConnectorFactory implements ConnectorFactory {
private static final Logger LOG = LoggerFactory.getLogger(GravitinoConnectorFactory.class);
private static final String DEFAULT_CONNECTOR_NAME = "gravitino";

@SuppressWarnings("UnusedVariable")
private GravitinoSystemTableFactory gravitinoSystemTableFactory;

private CatalogConnectorManager catalogConnectorManager;

@Override
Expand Down Expand Up @@ -71,8 +73,7 @@ public Connector create(
catalogConnectorManager.config(config);
catalogConnectorManager.start(clientProvider().get());

new GravitinoSystemTableFactory(catalogConnectorManager);

gravitinoSystemTableFactory = new GravitinoSystemTableFactory(catalogConnectorManager);
} catch (Exception e) {
LOG.error("Initialization of the GravitinoConnector failed.", e);
throw e;
Expand All @@ -83,10 +84,7 @@ public Connector create(
if (config.isDynamicConnector()) {
// The dynamic connector is an instance of GravitinoConnector. It is loaded from Gravitino
// server.
CatalogConnectorContext catalogConnectorContext =
catalogConnectorManager.getCatalogConnector(catalogName);
Preconditions.checkNotNull(catalogConnectorContext, "catalogConnector is not null");
return catalogConnectorContext.getConnector();
return catalogConnectorManager.createConnector(catalogName, config, context);
} else {
// The static connector is an instance of GravitinoSystemConnector. It is loaded by Trino
// using the connector configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RetryMode;
import io.trino.spi.connector.SaveMode;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SortItem;
import io.trino.spi.connector.TopNApplicationResult;
Expand Down Expand Up @@ -174,9 +175,10 @@ public ColumnMetadata getColumnMetadata(

@Override
public void createTable(
ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) {
ConnectorSession session, ConnectorTableMetadata tableMetadata, SaveMode saveMode) {
GravitinoTable table = metadataAdapter.createTable(tableMetadata);
catalogConnectorMetadata.createTable(table);
// saveMode = SaveMode.IGNORE is used to ignore the table creation if it already exists
catalogConnectorMetadata.createTable(table, saveMode == SaveMode.IGNORE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static java.util.Collections.emptyList;

import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog;
import io.trino.spi.connector.Connector;
import io.trino.spi.session.PropertyMetadata;
import java.util.List;
import java.util.Map;
Expand All @@ -22,7 +23,10 @@ default List<PropertyMetadata<?>> getTableProperties() {
}

/** @return Return internal connector config with Trino. */
Map<String, Object> buildInternalConnectorConfig(GravitinoCatalog catalog) throws Exception;
Map<String, String> buildInternalConnectorConfig(GravitinoCatalog catalog) throws Exception;

/** @return Return internal connector with Trino. */
Connector buildInternalConnector(Map<String, String> config) throws Exception;

/** @return SchemaProperties list that used to validate schema properties. */
default List<PropertyMetadata<?>> getSchemaProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,42 +82,34 @@ public CatalogConnectorMetadataAdapter getMetadataAdapter() {

static class Builder {
private final CatalogConnectorAdapter connectorAdapter;
private GravitinoMetalake metalake;
private Connector internalConnector;
private GravitinoCatalog catalog;
private GravitinoMetalake metalake;

Builder(CatalogConnectorAdapter connectorAdapter) {
this.connectorAdapter = connectorAdapter;
}

public Builder clone() {
return new Builder(connectorAdapter);
private Builder(CatalogConnectorAdapter connectorAdapter, GravitinoCatalog catalog) {
this.connectorAdapter = connectorAdapter;
this.catalog = catalog;
}

public Map<String, Object> buildConfig(GravitinoCatalog catalog) throws Exception {
return connectorAdapter.buildInternalConnectorConfig(catalog);
public Builder clone(GravitinoCatalog catalog) {
return new Builder(connectorAdapter, catalog);
}

Builder withMetalake(GravitinoMetalake metalake) {
this.metalake = metalake;
return this;
}

Builder withInternalConnector(Connector internalConnector) {
this.internalConnector = internalConnector;
return this;
}

Builder withCatalog(GravitinoCatalog catalog) {
this.catalog = catalog;
return this;
}

CatalogConnectorContext build() {
CatalogConnectorContext build() throws Exception {
Preconditions.checkArgument(metalake != null, "metalake is not null");
Preconditions.checkArgument(internalConnector != null, "internalConnector is not null");
Preconditions.checkArgument(catalog != null, "catalog is not null");
return new CatalogConnectorContext(catalog, metalake, internalConnector, connectorAdapter);
Map<String, String> connectorConfig = connectorAdapter.buildInternalConnectorConfig(catalog);
Connector connector = connectorAdapter.buildInternalConnector(connectorConfig);

return new CatalogConnectorContext(catalog, metalake, connector, connectorAdapter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ public CatalogConnectorContext.Builder createCatalogConnectorContextBuilder(
}

// Avoid using the same builder object to prevent catalog creation errors.
return builder.clone();
return builder.clone(catalog);
}
}
Loading

0 comments on commit 7470778

Please sign in to comment.