Skip to content

Commit

Permalink
[#3601] feat(trino-connector): Support automatically loading catalog …
Browse files Browse the repository at this point in the history
…to Trino by Trino SQL command (#3602)

1、Support automatically loading catalog to Trino by using CREATE CATALOG
command
2、Support drop catalog
3、Support update catalog

Fix: #3601

NO

NO
  • Loading branch information
diqiu50 authored and jerryshao committed May 31, 2024
1 parent 29827e5 commit 569b096
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG;

import io.trino.spi.TrinoException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;

public class GravitinoConfig {

Expand All @@ -33,6 +36,23 @@ public class GravitinoConfig {
"true",
false);

private static final ConfigEntry TRINO_JDBC_URI =
new ConfigEntry(
"trino.jdbc.uri", "The jdbc uri of Trino server", "jdbc:trino://localhost:8080", false);

private static final ConfigEntry TRINO_CATALOG_STORE =
new ConfigEntry(
"trino.catalog.store",
"The directory stored the catalog configuration of Trino",
"etc/catalog",
false);

private static final ConfigEntry TRINO_JDBC_USER =
new ConfigEntry("trino.jdbc.user", "The jdbc user name of Trino", "admin", false);

private static final ConfigEntry TRINO_JDBC_PASSWORD =
new ConfigEntry("trino.jdbc.password", "The jdbc user password of Trino", "", false);

public GravitinoConfig(Map<String, String> requiredConfig) {
config = requiredConfig;
for (Map.Entry<String, ConfigEntry> entry : CONFIG_DEFINITIONS.entrySet()) {
Expand Down Expand Up @@ -78,6 +98,33 @@ public String getCatalogConfig() {
return config.get(GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG);
}

public String getTrinoURI() {
return config.getOrDefault(TRINO_JDBC_URI.key, TRINO_JDBC_URI.defaultValue);
}

public String getCatalogStoreDirectory() {
return config.getOrDefault(TRINO_CATALOG_STORE.key, TRINO_CATALOG_STORE.defaultValue);
}

public String getTrinoUser() {
return config.getOrDefault(TRINO_JDBC_USER.key, TRINO_JDBC_USER.defaultValue);
}

public String getTrinoPassword() {
return config.getOrDefault(TRINO_JDBC_PASSWORD.key, TRINO_JDBC_PASSWORD.defaultValue);
}

public String toCatalogConfig() {
List<String> stringList = new ArrayList<>();
for (Map.Entry<String, ConfigEntry> entry : CONFIG_DEFINITIONS.entrySet()) {
String value = config.get(entry.getKey());
if (value != null) {
stringList.add(String.format("\"%s\"='%s'", entry.getKey(), value));
}
}
return StringUtils.join(stringList, ',');
}

static class ConfigEntry {
final String key;
final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_METALAKE_NOT_EXISTS;
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG;
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR;

import com.datastrato.gravitino.client.GravitinoAdminClient;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorFactory;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorManager;
import com.datastrato.gravitino.trino.connector.catalog.CatalogInjector;
import com.datastrato.gravitino.trino.connector.catalog.CatalogRegister;
import com.datastrato.gravitino.trino.connector.system.GravitinoSystemConnector;
import com.datastrato.gravitino.trino.connector.system.storedprocdure.GravitinoStoredProcedureFactory;
import com.datastrato.gravitino.trino.connector.system.table.GravitinoSystemTableFactory;
Expand Down Expand Up @@ -64,19 +65,19 @@ public Connector create(
synchronized (this) {
if (catalogConnectorManager == null) {
try {
CatalogInjector catalogInjector = new CatalogInjector();
catalogInjector.init(context);
CatalogRegister catalogRegister = new CatalogRegister();
CatalogConnectorFactory catalogConnectorFactory = new CatalogConnectorFactory();

catalogConnectorManager =
new CatalogConnectorManager(catalogInjector, catalogConnectorFactory);
catalogConnectorManager.config(config);
catalogConnectorManager.start(clientProvider().get());
new CatalogConnectorManager(catalogRegister, catalogConnectorFactory);
catalogConnectorManager.config(config, clientProvider().get());
catalogConnectorManager.start(context);

gravitinoSystemTableFactory = new GravitinoSystemTableFactory(catalogConnectorManager);
} catch (Exception e) {
LOG.error("Initialization of the GravitinoConnector failed.", e);
throw e;
String message = "Initialization of the GravitinoConnector failed" + e.getMessage();
LOG.error(message);
throw new TrinoException(GRAVITINO_RUNTIME_ERROR, message, e);
}
}
}
Expand All @@ -100,7 +101,6 @@ public Connector create(
catalogConnectorManager.addMetalake(metalake);
GravitinoStoredProcedureFactory gravitinoStoredProcedureFactory =
new GravitinoStoredProcedureFactory(catalogConnectorManager, metalake);

return new GravitinoSystemConnector(gravitinoStoredProcedureFactory);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum GravitinoErrorCode implements ErrorCodeSupplier {
GRAVITINO_METALAKE_ALREADY_EXISTS(21, EXTERNAL),
GRAVITINO_OPERATION_FAILED(22, EXTERNAL),
GRAVITINO_RUNTIME_ERROR(23, EXTERNAL),
GRAVITINO_DUPLICATED_CATALOGS(24, EXTERNAL),
;

// suppress ImmutableEnumChecker because ErrorCode is outside the project.
Expand Down
Loading

0 comments on commit 569b096

Please sign in to comment.