Skip to content

Commit

Permalink
[apache#3601] feat(trino-connector): Support automatically loading ca…
Browse files Browse the repository at this point in the history
…talog to Trino by Trino SQL command (apache#3602)

### What changes were proposed in this pull request?

1、Support automatically loading catalog to Trino by using CREATE CATALOG
command
2、Support drop catalog
3、Support update catalog

### Why are the changes needed?

Fix: apache#3601

### Does this PR introduce _any_ user-facing change?

NO

### How was this patch tested?

NO
  • Loading branch information
diqiu50 committed May 30, 2024
1 parent bb9081d commit 3952fea
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 427 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG;

import io.trino.spi.TrinoException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.logging.log4j.util.Strings;

public class GravitinoConfig {

Expand All @@ -33,6 +36,23 @@ public class GravitinoConfig {
"true",
false);

private static final ConfigEntry TRINO_JDBC_URI =
new ConfigEntry(
"trino.jdbc.uri", "The jdbc uri of Trino server", "jdbc:trino://localhost:8080", false);

private static final ConfigEntry TRINO_CATALOG_STORE =
new ConfigEntry(
"trino.catalog.store",
"The directory stored the catalog configuration of Trino",
"etc/catalog",
false);

private static final ConfigEntry TRINO_JDBC_USER =
new ConfigEntry("trino.jdbc.user", "The jdbc user name of Trino", "admin", false);

private static final ConfigEntry TRINO_JDBC_PASSWORD =
new ConfigEntry("trino.jdbc.password", "The jdbc user password of Trino", "", false);

public GravitinoConfig(Map<String, String> requiredConfig) {
config = requiredConfig;
for (Map.Entry<String, ConfigEntry> entry : CONFIG_DEFINITIONS.entrySet()) {
Expand Down Expand Up @@ -78,6 +98,33 @@ public String getCatalogConfig() {
return config.get(GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG);
}

public String getTrinoURI() {
return config.getOrDefault(TRINO_JDBC_URI.key, TRINO_JDBC_URI.defaultValue);
}

public String getCatalogStoreDirectory() {
return config.getOrDefault(TRINO_CATALOG_STORE.key, TRINO_CATALOG_STORE.defaultValue);
}

public String getTrinoUser() {
return config.getOrDefault(TRINO_JDBC_USER.key, TRINO_JDBC_USER.defaultValue);
}

public String getTrinoPassword() {
return config.getOrDefault(TRINO_JDBC_PASSWORD.key, TRINO_JDBC_PASSWORD.defaultValue);
}

public String toCatalogConfig() {
List<String> stringList = new ArrayList<>();
for (Map.Entry<String, ConfigEntry> entry : CONFIG_DEFINITIONS.entrySet()) {
String value = config.get(entry.getKey());
if (value != null) {
stringList.add(String.format("\"%s\"='%s'", entry.getKey(), value));
}
}
return Strings.join(stringList, ',');
}

static class ConfigEntry {
final String key;
final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_METALAKE_NOT_EXISTS;
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG;
import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR;

import com.datastrato.gravitino.client.GravitinoAdminClient;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorFactory;
import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorManager;
import com.datastrato.gravitino.trino.connector.catalog.CatalogInjector;
import com.datastrato.gravitino.trino.connector.catalog.CatalogRegister;
import com.datastrato.gravitino.trino.connector.system.GravitinoSystemConnector;
import com.datastrato.gravitino.trino.connector.system.storedprocdure.GravitinoStoredProcedureFactory;
import com.datastrato.gravitino.trino.connector.system.table.GravitinoSystemTableFactory;
Expand Down Expand Up @@ -64,19 +65,19 @@ public Connector create(
synchronized (this) {
if (catalogConnectorManager == null) {
try {
CatalogInjector catalogInjector = new CatalogInjector();
catalogInjector.init(context);
CatalogRegister catalogRegister = new CatalogRegister();
CatalogConnectorFactory catalogConnectorFactory = new CatalogConnectorFactory();

catalogConnectorManager =
new CatalogConnectorManager(catalogInjector, catalogConnectorFactory);
catalogConnectorManager.config(config);
catalogConnectorManager.start(clientProvider().get());
new CatalogConnectorManager(catalogRegister, catalogConnectorFactory);
catalogConnectorManager.config(config, clientProvider().get());
catalogConnectorManager.start(context);

gravitinoSystemTableFactory = new GravitinoSystemTableFactory(catalogConnectorManager);
} catch (Exception e) {
LOG.error("Initialization of the GravitinoConnector failed.", e);
throw e;
String message = "Initialization of the GravitinoConnector failed" + e.getMessage();
LOG.error(message);
throw new TrinoException(GRAVITINO_RUNTIME_ERROR, message, e);
}
}
}
Expand All @@ -100,8 +101,6 @@ public Connector create(
catalogConnectorManager.addMetalake(metalake);
GravitinoStoredProcedureFactory gravitinoStoredProcedureFactory =
new GravitinoStoredProcedureFactory(catalogConnectorManager, metalake);

catalogConnectorManager.loadMetalakeSync();
return new GravitinoSystemConnector(gravitinoStoredProcedureFactory);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public enum GravitinoErrorCode implements ErrorCodeSupplier {
GRAVITINO_METALAKE_ALREADY_EXISTS(21, EXTERNAL),
GRAVITINO_OPERATION_FAILED(22, EXTERNAL),
GRAVITINO_RUNTIME_ERROR(23, EXTERNAL),
GRAVITINO_DUPLICATED_CATALOGS(24, EXTERNAL),
;

// suppress ImmutableEnumChecker because ErrorCode is outside the project.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.datastrato.gravitino.exceptions.NoSuchMetalakeException;
import com.datastrato.gravitino.trino.connector.GravitinoConfig;
import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class CatalogConnectorManager {
private static final int LOAD_METALAKE_TIMEOUT = 30;

private final ScheduledExecutorService executorService;
private final CatalogInjector catalogInjector;
private final CatalogRegister catalogRegister;
private final CatalogConnectorFactory catalogConnectorFactory;

private final ConcurrentHashMap<String, CatalogConnectorContext> catalogConnectors =
Expand All @@ -75,8 +74,8 @@ public class CatalogConnectorManager {
private GravitinoConfig config;

public CatalogConnectorManager(
CatalogInjector catalogInjector, CatalogConnectorFactory catalogFactory) {
this.catalogInjector = catalogInjector;
CatalogRegister catalogRegister, CatalogConnectorFactory catalogFactory) {
this.catalogRegister = catalogRegister;
this.catalogConnectorFactory = catalogFactory;
this.executorService = createScheduledThreadPoolExecutor();
}
Expand All @@ -93,62 +92,56 @@ private static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor() {
.build());
}

public void config(GravitinoConfig config) {
public void config(GravitinoConfig config, GravitinoAdminClient client) {
this.config = Preconditions.checkNotNull(config, "config is not null");
}

public void start(GravitinoAdminClient client) {
if (client == null) {
this.gravitinoClient = GravitinoAdminClient.builder(config.getURI()).build();
} else {
this.gravitinoClient = client;
}

LOG.info("Gravitino CatalogConnectorManager started.");
}

public void loadMetalakeSync() {
try {
Future<?> future = executorService.submit(this::loadMetalakeImpl);
future.get();
} catch (Exception e) {
LOG.error("Load metalake sync failed.", e);
} finally {
// Load metalake for handling catalog in the metalake updates.
public void start(ConnectorContext context) throws Exception {
catalogRegister.init(context, config);
if (catalogRegister.isCoordinator()) {
executorService.scheduleWithFixedDelay(
this::loadMetalakeImpl,
this::loadMetalake,
CATALOG_LOAD_FREQUENCY_SECOND,
CATALOG_LOAD_FREQUENCY_SECOND,
TimeUnit.SECONDS);
}

LOG.info("Gravitino CatalogConnectorManager started.");
}

private GravitinoMetalake retrieveMetalake(String metalakeName) {
try {
return gravitinoClient.loadMetalake(metalakeName);
} catch (NoSuchMetalakeException e) {
throw new TrinoException(
GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalakeName + " not exists.");
private void loadMetalake() {
if (!catalogRegister.isTrinoStarted()) {
LOG.info("Waiting for the Trino started.");
return;
}
}

private void loadMetalakeImpl() {
for (String usedMetalake : usedMetalakes) {
try {
GravitinoMetalake metalake =
metalakes.computeIfAbsent(usedMetalake, this::retrieveMetalake);
LOG.info("Load metalake: {}", usedMetalake);
loadCatalogs(metalake);
} catch (NoSuchMetalakeException noSuchMetalakeException) {
LOG.warn("Metalake {} does not exist.", usedMetalake);
} catch (Exception e) {
LOG.error("Load Metalake {} failed.", usedMetalake, e);
}
}
}

@VisibleForTesting
public void loadCatalogs(GravitinoMetalake metalake) {
private GravitinoMetalake retrieveMetalake(String metalakeName) {
try {
return gravitinoClient.loadMetalake(metalakeName);
} catch (NoSuchMetalakeException e) {
throw new TrinoException(
GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalakeName + " not exists.");
}
}

private void loadCatalogs(GravitinoMetalake metalake) {
NameIdentifier[] catalogNames;
try {
catalogNames = metalake.listCatalogs();
Expand All @@ -157,12 +150,10 @@ public void loadCatalogs(GravitinoMetalake metalake) {
return;
}

if (LOG.isInfoEnabled()) {
LOG.info(
"Load metalake {}'s catalogs. catalogs: {}.",
metalake.name(),
Arrays.toString(catalogNames));
}
LOG.info(
"Load metalake {}'s catalogs. catalogs: {}.",
metalake.name(),
Arrays.toString(catalogNames));

// Delete those catalogs that have been deleted in Gravitino server
Set<String> catalogNameStrings =
Expand All @@ -176,7 +167,7 @@ public void loadCatalogs(GravitinoMetalake metalake) {
// Skip the catalog doesn't belong to this metalake.
entry.getValue().getMetalake().name().equals(metalake.name())) {
try {
unloadCatalog(metalake, entry.getKey());
unloadCatalog(entry.getValue().getCatalog());
} catch (Exception e) {
LOG.error("Failed to remove catalog {}.", entry.getKey(), e);
}
Expand All @@ -192,11 +183,10 @@ public void loadCatalogs(GravitinoMetalake metalake) {
GravitinoCatalog gravitinoCatalog = new GravitinoCatalog(metalake.name(), catalog);
if (catalogConnectors.containsKey(getTrinoCatalogName(gravitinoCatalog))) {
// Reload catalogs that have been updated in Gravitino server.
reloadCatalog(metalake, gravitinoCatalog);

reloadCatalog(gravitinoCatalog);
} else {
if (catalog.type() == Catalog.Type.RELATIONAL) {
loadCatalog(metalake, gravitinoCatalog);
loadCatalog(gravitinoCatalog);
}
}
} catch (Exception e) {
Expand All @@ -206,28 +196,28 @@ public void loadCatalogs(GravitinoMetalake metalake) {
});
}

private void reloadCatalog(GravitinoMetalake metalake, GravitinoCatalog catalog) {
GravitinoCatalog oldCatalog = catalogConnectors.get(getTrinoCatalogName(catalog)).getCatalog();
private void reloadCatalog(GravitinoCatalog catalog) {
String catalogFullName = getTrinoCatalogName(catalog);
GravitinoCatalog oldCatalog = catalogConnectors.get(catalogFullName).getCatalog();
if (catalog.getLastModifiedTime() <= oldCatalog.getLastModifiedTime()) {
return;
}

catalogInjector.removeCatalogConnector((getTrinoCatalogName(catalog)));
catalogConnectors.remove(getTrinoCatalogName(catalog));
catalogRegister.unregisterCatalog(catalogFullName);
catalogConnectors.remove(catalogFullName);

loadCatalogImpl(metalake, catalog);
LOG.info("Update catalog '{}' in metalake {} successfully.", catalog, metalake.name());
loadCatalogImpl(catalog);
LOG.info("Update catalog '{}' in metalake {} successfully.", catalog, catalog.getMetalake());
}

private void loadCatalog(GravitinoMetalake metalake, GravitinoCatalog catalog) {
loadCatalogImpl(metalake, catalog);
LOG.info("Load catalog {} in metalake {} successfully.", catalog, metalake.name());
private void loadCatalog(GravitinoCatalog catalog) {
loadCatalogImpl(catalog);
LOG.info("Load catalog {} in metalake {} successfully.", catalog, catalog.getMetalake());
}

@SuppressWarnings("UnusedVariable")
private void loadCatalogImpl(GravitinoMetalake metalake, GravitinoCatalog catalog) {
private void loadCatalogImpl(GravitinoCatalog catalog) {
try {
throw new NotImplementedException();
catalogRegister.registerCatalog(getTrinoCatalogName(catalog), catalog);
} catch (Exception e) {
String message =
String.format("Failed to create internal catalog connector. The catalog is: %s", catalog);
Expand All @@ -236,10 +226,14 @@ private void loadCatalogImpl(GravitinoMetalake metalake, GravitinoCatalog catalo
}
}

private void unloadCatalog(GravitinoMetalake metalake, String catalogFullName) {
catalogInjector.removeCatalogConnector(catalogFullName);
private void unloadCatalog(GravitinoCatalog catalog) {
String catalogFullName = getTrinoCatalogName(catalog);
catalogRegister.unregisterCatalog(catalogFullName);
catalogConnectors.remove(catalogFullName);
LOG.info("Remove catalog '{}' in metalake {} successfully.", catalogFullName, metalake.name());
LOG.info(
"Remove catalog '{}' in metalake {} successfully.",
catalog.getName(),
catalog.getMetalake());
}

public CatalogConnectorContext getCatalogConnector(String catalogName) {
Expand Down Expand Up @@ -286,7 +280,7 @@ public void createCatalog(

LOG.info("Create catalog {} in metalake {} successfully.", catalogName, metalake);

Future<?> future = executorService.submit(this::loadMetalakeImpl);
Future<?> future = executorService.submit(this::loadMetalake);
future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS);

if (!catalogConnectors.containsKey(getTrinoCatalogName(metalakeName, catalogName))) {
Expand All @@ -312,7 +306,6 @@ public void dropCatalog(String metalakeName, String catalogName, boolean ignoreN
try {
GravitinoMetalake metalake = gravitinoClient.loadMetalake(metalakeName);

// NameIdentifier catalog = NameIdentifier.of(metalakeName, catalogName);
if (!metalake.catalogExists(catalogName)) {
if (ignoreNotExist) {
return;
Expand All @@ -329,7 +322,7 @@ public void dropCatalog(String metalakeName, String catalogName, boolean ignoreN
}
LOG.info("Drop catalog {} in metalake {} successfully.", catalogName, metalake);

Future<?> future = executorService.submit(this::loadMetalakeImpl);
Future<?> future = executorService.submit(this::loadMetalake);
future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS);

if (catalogConnectors.containsKey(getTrinoCatalogName(metalakeName, catalogName))) {
Expand Down Expand Up @@ -387,7 +380,7 @@ public void alterCatalog(
GravitinoMetalake metalake = gravitinoClient.loadMetalake(metalakeName);
metalake.alterCatalog(catalogName, changes.toArray(changes.toArray(new CatalogChange[0])));

Future<?> future = executorService.submit(this::loadMetalakeImpl);
Future<?> future = executorService.submit(this::loadMetalake);
future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS);

catalogConnectorContext =
Expand Down Expand Up @@ -436,7 +429,7 @@ public Connector createConnector(

CatalogConnectorContext connectorContext = builder.build();
catalogConnectors.put(connectorName, connectorContext);
LOG.info("Create connector success");
LOG.info("Create connector {} successful", connectorName);
return connectorContext.getConnector();
} catch (Exception e) {
LOG.error("Failed to create connector: {}", connectorName, e);
Expand Down
Loading

0 comments on commit 3952fea

Please sign in to comment.