diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConfig.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConfig.java index 0f430ec45c4..42f869b9823 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConfig.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConfig.java @@ -7,8 +7,11 @@ import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG; import io.trino.spi.TrinoException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.logging.log4j.util.Strings; public class GravitinoConfig { @@ -33,6 +36,23 @@ public class GravitinoConfig { "true", false); + private static final ConfigEntry TRINO_JDBC_URI = + new ConfigEntry( + "trino.jdbc.uri", "The jdbc uri of Trino server", "jdbc:trino://localhost:8080", false); + + private static final ConfigEntry TRINO_CATALOG_STORE = + new ConfigEntry( + "trino.catalog.store", + "The directory stored the catalog configuration of Trino", + "etc/catalog", + false); + + private static final ConfigEntry TRINO_JDBC_USER = + new ConfigEntry("trino.jdbc.user", "The jdbc user name of Trino", "admin", false); + + private static final ConfigEntry TRINO_JDBC_PASSWORD = + new ConfigEntry("trino.jdbc.password", "The jdbc user password of Trino", "", false); + public GravitinoConfig(Map requiredConfig) { config = requiredConfig; for (Map.Entry entry : CONFIG_DEFINITIONS.entrySet()) { @@ -78,6 +98,33 @@ public String getCatalogConfig() { return config.get(GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG); } + public String getTrinoURI() { + return config.getOrDefault(TRINO_JDBC_URI.key, TRINO_JDBC_URI.defaultValue); + } + + public String getCatalogStoreDirectory() { + return config.getOrDefault(TRINO_CATALOG_STORE.key, TRINO_CATALOG_STORE.defaultValue); + } + + public String getTrinoUser() { + return config.getOrDefault(TRINO_JDBC_USER.key, TRINO_JDBC_USER.defaultValue); + } + + public String getTrinoPassword() { + return config.getOrDefault(TRINO_JDBC_PASSWORD.key, TRINO_JDBC_PASSWORD.defaultValue); + } + + public String toCatalogConfig() { + List stringList = new ArrayList<>(); + for (Map.Entry entry : CONFIG_DEFINITIONS.entrySet()) { + String value = config.get(entry.getKey()); + if (value != null) { + stringList.add(String.format("\"%s\"='%s'", entry.getKey(), value)); + } + } + return Strings.join(stringList, ','); + } + static class ConfigEntry { final String key; final String description; diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConnectorFactory.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConnectorFactory.java index 67162de2ef3..a1605b8e8cf 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConnectorFactory.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoConnectorFactory.java @@ -6,11 +6,12 @@ import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_METALAKE_NOT_EXISTS; import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR; import com.datastrato.gravitino.client.GravitinoAdminClient; import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorFactory; import com.datastrato.gravitino.trino.connector.catalog.CatalogConnectorManager; -import com.datastrato.gravitino.trino.connector.catalog.CatalogInjector; +import com.datastrato.gravitino.trino.connector.catalog.CatalogRegister; import com.datastrato.gravitino.trino.connector.system.GravitinoSystemConnector; import com.datastrato.gravitino.trino.connector.system.storedprocdure.GravitinoStoredProcedureFactory; import com.datastrato.gravitino.trino.connector.system.table.GravitinoSystemTableFactory; @@ -64,19 +65,19 @@ public Connector create( synchronized (this) { if (catalogConnectorManager == null) { try { - CatalogInjector catalogInjector = new CatalogInjector(); - catalogInjector.init(context); + CatalogRegister catalogRegister = new CatalogRegister(); CatalogConnectorFactory catalogConnectorFactory = new CatalogConnectorFactory(); catalogConnectorManager = - new CatalogConnectorManager(catalogInjector, catalogConnectorFactory); - catalogConnectorManager.config(config); - catalogConnectorManager.start(clientProvider().get()); + new CatalogConnectorManager(catalogRegister, catalogConnectorFactory); + catalogConnectorManager.config(config, clientProvider().get()); + catalogConnectorManager.start(context); gravitinoSystemTableFactory = new GravitinoSystemTableFactory(catalogConnectorManager); } catch (Exception e) { - LOG.error("Initialization of the GravitinoConnector failed.", e); - throw e; + String message = "Initialization of the GravitinoConnector failed" + e.getMessage(); + LOG.error(message); + throw new TrinoException(GRAVITINO_RUNTIME_ERROR, message, e); } } } @@ -100,8 +101,6 @@ public Connector create( catalogConnectorManager.addMetalake(metalake); GravitinoStoredProcedureFactory gravitinoStoredProcedureFactory = new GravitinoStoredProcedureFactory(catalogConnectorManager, metalake); - - catalogConnectorManager.loadMetalakeSync(); return new GravitinoSystemConnector(gravitinoStoredProcedureFactory); } } diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoErrorCode.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoErrorCode.java index bf31d334f3b..3be2160d5c1 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoErrorCode.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/GravitinoErrorCode.java @@ -35,6 +35,7 @@ public enum GravitinoErrorCode implements ErrorCodeSupplier { GRAVITINO_METALAKE_ALREADY_EXISTS(21, EXTERNAL), GRAVITINO_OPERATION_FAILED(22, EXTERNAL), GRAVITINO_RUNTIME_ERROR(23, EXTERNAL), + GRAVITINO_DUPLICATED_CATALOGS(24, EXTERNAL), ; // suppress ImmutableEnumChecker because ErrorCode is outside the project. diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorManager.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorManager.java index cba0b6c278a..3415f5bb9b5 100644 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorManager.java +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogConnectorManager.java @@ -22,7 +22,6 @@ import com.datastrato.gravitino.exceptions.NoSuchMetalakeException; import com.datastrato.gravitino.trino.connector.GravitinoConfig; import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.trino.spi.TrinoException; @@ -62,7 +61,7 @@ public class CatalogConnectorManager { private static final int LOAD_METALAKE_TIMEOUT = 30; private final ScheduledExecutorService executorService; - private final CatalogInjector catalogInjector; + private final CatalogRegister catalogRegister; private final CatalogConnectorFactory catalogConnectorFactory; private final ConcurrentHashMap catalogConnectors = @@ -75,8 +74,8 @@ public class CatalogConnectorManager { private GravitinoConfig config; public CatalogConnectorManager( - CatalogInjector catalogInjector, CatalogConnectorFactory catalogFactory) { - this.catalogInjector = catalogInjector; + CatalogRegister catalogRegister, CatalogConnectorFactory catalogFactory) { + this.catalogRegister = catalogRegister; this.catalogConnectorFactory = catalogFactory; this.executorService = createScheduledThreadPoolExecutor(); } @@ -93,62 +92,56 @@ private static ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor() { .build()); } - public void config(GravitinoConfig config) { + public void config(GravitinoConfig config, GravitinoAdminClient client) { this.config = Preconditions.checkNotNull(config, "config is not null"); - } - - public void start(GravitinoAdminClient client) { if (client == null) { this.gravitinoClient = GravitinoAdminClient.builder(config.getURI()).build(); } else { this.gravitinoClient = client; } - - LOG.info("Gravitino CatalogConnectorManager started."); } - public void loadMetalakeSync() { - try { - Future future = executorService.submit(this::loadMetalakeImpl); - future.get(); - } catch (Exception e) { - LOG.error("Load metalake sync failed.", e); - } finally { - // Load metalake for handling catalog in the metalake updates. + public void start(ConnectorContext context) throws Exception { + catalogRegister.init(context, config); + if (catalogRegister.isCoordinator()) { executorService.scheduleWithFixedDelay( - this::loadMetalakeImpl, + this::loadMetalake, CATALOG_LOAD_FREQUENCY_SECOND, CATALOG_LOAD_FREQUENCY_SECOND, TimeUnit.SECONDS); } + + LOG.info("Gravitino CatalogConnectorManager started."); } - private GravitinoMetalake retrieveMetalake(String metalakeName) { - try { - return gravitinoClient.loadMetalake(metalakeName); - } catch (NoSuchMetalakeException e) { - throw new TrinoException( - GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalakeName + " not exists."); + private void loadMetalake() { + if (!catalogRegister.isTrinoStarted()) { + LOG.info("Waiting for the Trino started."); + return; } - } - private void loadMetalakeImpl() { for (String usedMetalake : usedMetalakes) { try { GravitinoMetalake metalake = metalakes.computeIfAbsent(usedMetalake, this::retrieveMetalake); LOG.info("Load metalake: {}", usedMetalake); loadCatalogs(metalake); - } catch (NoSuchMetalakeException noSuchMetalakeException) { - LOG.warn("Metalake {} does not exist.", usedMetalake); } catch (Exception e) { LOG.error("Load Metalake {} failed.", usedMetalake, e); } } } - @VisibleForTesting - public void loadCatalogs(GravitinoMetalake metalake) { + private GravitinoMetalake retrieveMetalake(String metalakeName) { + try { + return gravitinoClient.loadMetalake(metalakeName); + } catch (NoSuchMetalakeException e) { + throw new TrinoException( + GRAVITINO_METALAKE_NOT_EXISTS, "Metalake " + metalakeName + " not exists."); + } + } + + private void loadCatalogs(GravitinoMetalake metalake) { NameIdentifier[] catalogNames; try { catalogNames = metalake.listCatalogs(); @@ -157,12 +150,10 @@ public void loadCatalogs(GravitinoMetalake metalake) { return; } - if (LOG.isInfoEnabled()) { - LOG.info( - "Load metalake {}'s catalogs. catalogs: {}.", - metalake.name(), - Arrays.toString(catalogNames)); - } + LOG.info( + "Load metalake {}'s catalogs. catalogs: {}.", + metalake.name(), + Arrays.toString(catalogNames)); // Delete those catalogs that have been deleted in Gravitino server Set catalogNameStrings = @@ -176,7 +167,7 @@ public void loadCatalogs(GravitinoMetalake metalake) { // Skip the catalog doesn't belong to this metalake. entry.getValue().getMetalake().name().equals(metalake.name())) { try { - unloadCatalog(metalake, entry.getKey()); + unloadCatalog(entry.getValue().getCatalog()); } catch (Exception e) { LOG.error("Failed to remove catalog {}.", entry.getKey(), e); } @@ -192,11 +183,10 @@ public void loadCatalogs(GravitinoMetalake metalake) { GravitinoCatalog gravitinoCatalog = new GravitinoCatalog(metalake.name(), catalog); if (catalogConnectors.containsKey(getTrinoCatalogName(gravitinoCatalog))) { // Reload catalogs that have been updated in Gravitino server. - reloadCatalog(metalake, gravitinoCatalog); - + reloadCatalog(gravitinoCatalog); } else { if (catalog.type() == Catalog.Type.RELATIONAL) { - loadCatalog(metalake, gravitinoCatalog); + loadCatalog(gravitinoCatalog); } } } catch (Exception e) { @@ -206,28 +196,28 @@ public void loadCatalogs(GravitinoMetalake metalake) { }); } - private void reloadCatalog(GravitinoMetalake metalake, GravitinoCatalog catalog) { - GravitinoCatalog oldCatalog = catalogConnectors.get(getTrinoCatalogName(catalog)).getCatalog(); + private void reloadCatalog(GravitinoCatalog catalog) { + String catalogFullName = getTrinoCatalogName(catalog); + GravitinoCatalog oldCatalog = catalogConnectors.get(catalogFullName).getCatalog(); if (catalog.getLastModifiedTime() <= oldCatalog.getLastModifiedTime()) { return; } - catalogInjector.removeCatalogConnector((getTrinoCatalogName(catalog))); - catalogConnectors.remove(getTrinoCatalogName(catalog)); + catalogRegister.unregisterCatalog(catalogFullName); + catalogConnectors.remove(catalogFullName); - loadCatalogImpl(metalake, catalog); - LOG.info("Update catalog '{}' in metalake {} successfully.", catalog, metalake.name()); + loadCatalogImpl(catalog); + LOG.info("Update catalog '{}' in metalake {} successfully.", catalog, catalog.getMetalake()); } - private void loadCatalog(GravitinoMetalake metalake, GravitinoCatalog catalog) { - loadCatalogImpl(metalake, catalog); - LOG.info("Load catalog {} in metalake {} successfully.", catalog, metalake.name()); + private void loadCatalog(GravitinoCatalog catalog) { + loadCatalogImpl(catalog); + LOG.info("Load catalog {} in metalake {} successfully.", catalog, catalog.getMetalake()); } - @SuppressWarnings("UnusedVariable") - private void loadCatalogImpl(GravitinoMetalake metalake, GravitinoCatalog catalog) { + private void loadCatalogImpl(GravitinoCatalog catalog) { try { - throw new NotImplementedException(); + catalogRegister.registerCatalog(getTrinoCatalogName(catalog), catalog); } catch (Exception e) { String message = String.format("Failed to create internal catalog connector. The catalog is: %s", catalog); @@ -236,10 +226,14 @@ private void loadCatalogImpl(GravitinoMetalake metalake, GravitinoCatalog catalo } } - private void unloadCatalog(GravitinoMetalake metalake, String catalogFullName) { - catalogInjector.removeCatalogConnector(catalogFullName); + private void unloadCatalog(GravitinoCatalog catalog) { + String catalogFullName = getTrinoCatalogName(catalog); + catalogRegister.unregisterCatalog(catalogFullName); catalogConnectors.remove(catalogFullName); - LOG.info("Remove catalog '{}' in metalake {} successfully.", catalogFullName, metalake.name()); + LOG.info( + "Remove catalog '{}' in metalake {} successfully.", + catalog.getName(), + catalog.getMetalake()); } public CatalogConnectorContext getCatalogConnector(String catalogName) { @@ -286,7 +280,7 @@ public void createCatalog( LOG.info("Create catalog {} in metalake {} successfully.", catalogName, metalake); - Future future = executorService.submit(this::loadMetalakeImpl); + Future future = executorService.submit(this::loadMetalake); future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS); if (!catalogConnectors.containsKey(getTrinoCatalogName(metalakeName, catalogName))) { @@ -312,7 +306,6 @@ public void dropCatalog(String metalakeName, String catalogName, boolean ignoreN try { GravitinoMetalake metalake = gravitinoClient.loadMetalake(metalakeName); - // NameIdentifier catalog = NameIdentifier.of(metalakeName, catalogName); if (!metalake.catalogExists(catalogName)) { if (ignoreNotExist) { return; @@ -329,7 +322,7 @@ public void dropCatalog(String metalakeName, String catalogName, boolean ignoreN } LOG.info("Drop catalog {} in metalake {} successfully.", catalogName, metalake); - Future future = executorService.submit(this::loadMetalakeImpl); + Future future = executorService.submit(this::loadMetalake); future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS); if (catalogConnectors.containsKey(getTrinoCatalogName(metalakeName, catalogName))) { @@ -387,7 +380,7 @@ public void alterCatalog( GravitinoMetalake metalake = gravitinoClient.loadMetalake(metalakeName); metalake.alterCatalog(catalogName, changes.toArray(changes.toArray(new CatalogChange[0]))); - Future future = executorService.submit(this::loadMetalakeImpl); + Future future = executorService.submit(this::loadMetalake); future.get(LOAD_METALAKE_TIMEOUT, TimeUnit.SECONDS); catalogConnectorContext = @@ -436,7 +429,7 @@ public Connector createConnector( CatalogConnectorContext connectorContext = builder.build(); catalogConnectors.put(connectorName, connectorContext); - LOG.info("Create connector success"); + LOG.info("Create connector {} successful", connectorName); return connectorContext.getConnector(); } catch (Exception e) { LOG.error("Failed to create connector: {}", connectorName, e); diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogInjector.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogInjector.java deleted file mode 100644 index a4a35de70db..00000000000 --- a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogInjector.java +++ /dev/null @@ -1,356 +0,0 @@ -/* - * Copyright 2023 Datastrato Pvt Ltd. - * This software is licensed under the Apache License version 2. - */ -package com.datastrato.gravitino.trino.connector.catalog; - -import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CREATE_INNER_CONNECTOR_FAILED; -import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION; - -import com.datastrato.gravitino.trino.connector.GravitinoConfig; -import com.datastrato.gravitino.trino.connector.GravitinoErrorCode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import io.trino.spi.TrinoException; -import io.trino.spi.connector.Connector; -import io.trino.spi.connector.ConnectorContext; -import io.trino.spi.connector.MetadataProvider; -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class dynamically injects the Catalog managed by Gravitino into Trino using reflection - * techniques. It allows it to be used in Trino like a regular Trino catalog. In Gravitino, the - * catalog name consists of the "metalake" and catalog name, for example, "user_0.hive_us." We can - * use it directly in Trino. - */ -public class CatalogInjector { - - private static final Logger LOG = LoggerFactory.getLogger(CatalogInjector.class); - - private static final int MIN_TRINO_SPI_VERSION = 360; - - // It is used to inject catalogs to trino - private InjectCatalogHandle injectHandle; - // It's used to remove catalogs from trino - private RemoveCatalogHandle removeHandle; - - // It is used to create internal catalogs. - private CreateCatalogHandle createHandle; - private String trinoVersion; - - private ConcurrentHashMap internalCatalogs = new ConcurrentHashMap<>(); - - private void checkTrinoSpiVersion(ConnectorContext context) { - this.trinoVersion = context.getSpiVersion(); - - int version = Integer.parseInt(context.getSpiVersion()); - if (version < MIN_TRINO_SPI_VERSION) { - String errmsg = - String.format( - "Unsupported trino-%s version. min support version is trino-%d", - trinoVersion, MIN_TRINO_SPI_VERSION); - throw new TrinoException(GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg); - } - } - - private static Field getField(Object targetObject, String fieldName) throws NoSuchFieldException { - Field field = targetObject.getClass().getDeclaredField(fieldName); - field.setAccessible(true); - return field; - } - - private static Object getFiledObject(Object targetObject, String fieldName) - throws NoSuchFieldException, IllegalAccessException { - return getField(targetObject, fieldName).get(targetObject); - } - - private static boolean isClassObject(Object targetObject, String className) { - return targetObject.getClass().getName().endsWith(className); - } - - private static Class getClass(ClassLoader classLoader, String className) - throws ClassNotFoundException { - return classLoader.loadClass(className); - } - - /** - * @param context - *
-   *  This function does the following tasks by ConnectorContext:
-   *  1. Retrieve the DiscoveryNodeManager object.
-   *  2. To enable Trino to handle tables on every node,
-   *  set 'allCatalogsOnAllNodes' to 'true' and 'activeNodesByCatalogHandle' to empty.
-   *  3. Retrieve the catalogManager object.
-   *  4. Get createCatalog function in catalogFactory
-   *  5. Create a CreateCatalogHandle for the Gravitino connector's internal connector.
-   *  6. Create InjectCatalogHandle for injection catalogs to trino.
-   *
-   *  A runtime ConnectorContext hierarchy:
-   *  context (ConnectorContext)
-   *  --nodeManager (ConnectorAwareNodeManager)
-   *  ----nodeManager (DiscoveryNodeManager)
-   *  ------nodeManager (DiscoveryNodeManager)
-   *  ------allCatalogsOnAllNodes (boolean)
-   *  ------activeNodesByCatalogHandle (SetMultimap)
-   *  --metadataProvider(InternalMetadataProvider)
-   *  ----metadata (TracingMetadata)
-   *  ------delegate (MetadataManager)
-   *  --------transactionManager (InMemoryTransactionManager)
-   *  ----------catalogManager (StaticCatalogManager)
-   *  ------------catalogFactory (LazyCatalogFactory)
-   *  --------------createCatalog() (Function)
-   *  ------------catalogs (ConcurrentHashMap)
-   * 
- */ - public void init(ConnectorContext context) { - // Injector trino catalog need NodeManager support allCatalogsOnAllNodes; - checkTrinoSpiVersion(context); - - try { - // 1. Retrieve the DiscoveryNodeManager object. - Object nodeManager = context.getNodeManager(); - nodeManager = getFiledObject(nodeManager, "nodeManager"); - - if (isClassObject(nodeManager, "DiscoveryNodeManager")) { - // 2. To enable Trino to handle tables on every node - Field allCatalogsOnAllNodes = getField(nodeManager, "allCatalogsOnAllNodes"); - allCatalogsOnAllNodes.setBoolean(nodeManager, true); - - Field activeNodesByCatalogHandle = getField(nodeManager, "activeNodesByCatalogHandle"); - activeNodesByCatalogHandle.set(nodeManager, Optional.empty()); - } - - // 3. Retrieve the catalogManager object. - MetadataProvider metadataProvider = context.getMetadataProvider(); - - Object metadata = getFiledObject(metadataProvider, "metadata"); - Object metadataManager = metadata; - if (isClassObject(metadata, "TracingMetadata")) { - metadataManager = getFiledObject(metadata, "delegate"); - } - Preconditions.checkNotNull(metadataManager, "metadataManager should not be null"); - - Object transactionManager = getFiledObject(metadataManager, "transactionManager"); - Object catalogManager = getFiledObject(transactionManager, "catalogManager"); - Preconditions.checkNotNull(catalogManager, "catalogManager should not be null"); - - // 4. Get createCatalog function in catalogFactory - Object catalogFactory = getFiledObject(catalogManager, "catalogFactory"); - Preconditions.checkNotNull(catalogFactory, "catalogFactory should not be null"); - - Class catalogPropertiesClass = - getClass( - catalogManager.getClass().getClassLoader(), "io.trino.connector.CatalogProperties"); - Method createCatalogMethod = - catalogFactory.getClass().getDeclaredMethod("createCatalog", catalogPropertiesClass); - Preconditions.checkNotNull(createCatalogMethod, "createCatalogMethod should not be null"); - - // 5. Create a CreateCatalogHandle - createHandle = - (catalogName, catalogProperties) -> { - ObjectMapper objectMapper = new ObjectMapper(); - Object catalogPropertiesObject = - objectMapper.readValue(catalogProperties, catalogPropertiesClass); - - // Call catalogFactory.createCatalog() return CatalogConnector - Object catalogConnector = - createCatalogMethod.invoke(catalogFactory, catalogPropertiesObject); - internalCatalogs.put(catalogName, catalogConnector); - - // The catalogConnector hierarchy: - // --catalogConnector (CatalogConnector) - // ----catalogConnector (ConnectorServices) - // ------connector (Connector) - - // Get a connector object from trino CatalogConnector. - Object catalogConnectorObject = getFiledObject(catalogConnector, "catalogConnector"); - - return getFiledObject(catalogConnectorObject, "connector"); - }; - - // 6. Create InjectCatalogHandle - createInjectHandler( - catalogManager, catalogFactory, createCatalogMethod, catalogPropertiesClass); - - removeInjectHandle(catalogManager); - LOG.info("Bind Trino catalog manager successfully."); - } catch (Exception e) { - String message = - String.format( - "Bind Trino catalog manager failed, unsupported trino-%s version", trinoVersion); - LOG.error(message, e); - throw new TrinoException(GRAVITINO_UNSUPPORTED_TRINO_VERSION, message, e); - } - } - - private void removeInjectHandle(Object catalogManager) - throws NoSuchFieldException, IllegalAccessException { - if (isClassObject(catalogManager, "CoordinatorDynamicCatalogManager")) { - ConcurrentHashMap activeCatalogs = - (ConcurrentHashMap) getFiledObject(catalogManager, "activeCatalogs"); - Preconditions.checkNotNull(activeCatalogs, "activeCatalogs should not be null"); - - ConcurrentHashMap allCatalogs = - (ConcurrentHashMap) getFiledObject(catalogManager, "allCatalogs"); - Preconditions.checkNotNull(allCatalogs, "allCatalogs should not be null"); - - removeHandle = - (catalogName) -> { - activeCatalogs.remove(catalogName); - allCatalogs.remove(catalogName); - }; - } else { - // The catalogManager is an instance of StaticCatalogManager - ConcurrentHashMap catalogs = (ConcurrentHashMap) getFiledObject(catalogManager, "catalogs"); - Preconditions.checkNotNull(catalogs, "catalogs should not be null"); - removeHandle = - (catalogName) -> { - Object catalogConnector = catalogs.remove(catalogName); - if (catalogConnector != null) { - Method shutdown = catalogConnector.getClass().getDeclaredMethod("shutdown"); - shutdown.invoke(catalogConnector); - Object internalCatalogConnector = internalCatalogs.get(catalogName); - shutdown.invoke(internalCatalogConnector); - } - }; - } - } - - private void createInjectHandler( - Object catalogManager, - Object catalogFactory, - Method createCatalogMethod, - Class catalogPropertiesClass) - throws NoSuchFieldException, IllegalAccessException { - // The catalogManager is an instance of CoordinatorDynamicCatalogManager - if (isClassObject(catalogManager, "CoordinatorDynamicCatalogManager")) { - ConcurrentHashMap activeCatalogs = - (ConcurrentHashMap) getFiledObject(catalogManager, "activeCatalogs"); - Preconditions.checkNotNull(activeCatalogs, "activeCatalogs should not be null"); - - ConcurrentHashMap allCatalogs = - (ConcurrentHashMap) getFiledObject(catalogManager, "allCatalogs"); - Preconditions.checkNotNull(allCatalogs, "allCatalogs should not be null"); - - injectHandle = - (catalogName, catalogProperties) -> { - // Call CatalogFactory:createCatalog and add the catalog to - // CoordinatorDynamicCatalogManager - ObjectMapper objectMapper = new ObjectMapper(); - Object catalogPropertiesObject = - objectMapper.readValue(catalogProperties, catalogPropertiesClass); - Object catalogConnector = - createCatalogMethod.invoke(catalogFactory, catalogPropertiesObject); - - Field catalogField = catalogConnector.getClass().getDeclaredField("catalog"); - catalogField.setAccessible(true); - Object catalog = catalogField.get(catalogConnector); - activeCatalogs.put(catalogName, catalog); - - Field catelogHandleField = - catalogConnector.getClass().getDeclaredField("catalogHandle"); - catelogHandleField.setAccessible(true); - Object catalogHandle = catelogHandleField.get(catalogConnector); - allCatalogs.put(catalogHandle, catalogConnector); - }; - } else { - // The catalogManager is an instance of StaticCatalogManager - ConcurrentHashMap catalogs = (ConcurrentHashMap) getFiledObject(catalogManager, "catalogs"); - Preconditions.checkNotNull(catalogs, "catalogs should not be null"); - - injectHandle = - (catalogName, catalogProperties) -> { - // call CatalogFactory:createCatalog and add the catalog to StaticCatalogManager - ObjectMapper objectMapper = new ObjectMapper(); - Object catalogPropertiesObject = - objectMapper.readValue(catalogProperties, catalogPropertiesClass); - - Object catalogConnector = - createCatalogMethod.invoke(catalogFactory, catalogPropertiesObject); - if (catalogs.containsKey(catalogName)) { - String message = - String.format("Inject catalog failed. catalog %s already exists", catalogName); - LOG.error(message); - throw new TrinoException(GRAVITINO_CREATE_INNER_CONNECTOR_FAILED, message); - } - catalogs.put(catalogName, catalogConnector); - }; - } - } - - void removeCatalogConnector(String catalogName) { - try { - removeHandle.invoke(catalogName); - LOG.info("Remove trino catalog {} successfully.", catalogName); - } catch (Exception e) { - LOG.error("Remove trino catalog {} failed.", catalogName, e); - throw new TrinoException(GRAVITINO_CREATE_INNER_CONNECTOR_FAILED, e); - } - } - - void injectCatalogConnector(String catalogName) { - try { - String catalogProperties = createCatalogProperties(catalogName); - injectHandle.invoke(catalogName, catalogProperties); - - LOG.info("Inject trino catalog {} successfully.", catalogName); - } catch (Exception e) { - LOG.error("Inject trino catalog {} failed.", catalogName, e); - throw new TrinoException(GRAVITINO_CREATE_INNER_CONNECTOR_FAILED, e); - } - } - - String createCatalogProperties(String catalogName) { - String catalogPropertiesTemplate = - "{\"catalogHandle\": \"%s:normal:default\",\"connectorName\":\"gravitino\", \"properties\": " - + "{\"" - + GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR - + "\": \"true\"}" - + "}"; - return String.format(catalogPropertiesTemplate, catalogName); - } - - Connector createConnector(String connectorName, Map properties) { - String connectorProperties; - try { - ObjectMapper objectMapper = new ObjectMapper(); - connectorProperties = objectMapper.writeValueAsString(properties); - if (LOG.isDebugEnabled()) { - LOG.debug( - "Create internal catalog connector {}. The config:{} .", - connectorName, - connectorProperties); - } - - Object catalogConnector = createHandle.invoke(connectorName, connectorProperties); - - LOG.info("Create internal catalog connector {} successfully.", connectorName); - return (Connector) catalogConnector; - } catch (Exception e) { - LOG.error( - "Create internal catalog connector {} failed. Connector properties: {} ", - connectorName, - properties, - e); - throw new TrinoException(GRAVITINO_CREATE_INNER_CONNECTOR_FAILED, e); - } - } - - interface InjectCatalogHandle { - void invoke(String name, String properties) throws Exception; - } - - interface CreateCatalogHandle { - Object invoke(String name, String properties) throws Exception; - } - - interface RemoveCatalogHandle { - void invoke(String catalogName) throws Exception; - } -} diff --git a/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogRegister.java b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogRegister.java new file mode 100644 index 00000000000..ae12f299d77 --- /dev/null +++ b/trino-connector/src/main/java/com/datastrato/gravitino/trino/connector/catalog/CatalogRegister.java @@ -0,0 +1,235 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.trino.connector.catalog; + +import static com.datastrato.gravitino.trino.connector.GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR; +import static com.datastrato.gravitino.trino.connector.GravitinoConfig.GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_CATALOG_ALREADY_EXISTS; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_DUPLICATED_CATALOGS; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_MISSING_CONFIG; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_RUNTIME_ERROR; +import static com.datastrato.gravitino.trino.connector.GravitinoErrorCode.GRAVITINO_UNSUPPORTED_TRINO_VERSION; + +import com.datastrato.gravitino.trino.connector.GravitinoConfig; +import com.datastrato.gravitino.trino.connector.metadata.GravitinoCatalog; +import io.trino.jdbc.TrinoDriver; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorContext; +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class dynamically register the Catalog managed by Gravitino into Trino using Trino CREATE + * CATALOG statement. It allows the catalog to be used in Trino like a regular Trino catalog. + */ +public class CatalogRegister { + + private static final Logger LOG = LoggerFactory.getLogger(CatalogRegister.class); + + private static final int MIN_TRINO_SPI_VERSION = 435; + private static final int EXECUTE_QUERY_MAX_RETRIES = 6; + private static final int EXECUTE_QUERY_BACKOFF_TIME_SECOND = 5; + + private String trinoVersion; + private Connection connection; + private boolean isCoordinator; + private boolean isStarted = false; + private String catalogStoreDirectory; + private GravitinoConfig config; + + private void checkTrinoSpiVersion(ConnectorContext context) { + this.trinoVersion = context.getSpiVersion(); + + int version = Integer.parseInt(context.getSpiVersion()); + if (version < MIN_TRINO_SPI_VERSION) { + String errmsg = + String.format( + "Unsupported Trino-%s version. min support version is Trino-%d", + trinoVersion, MIN_TRINO_SPI_VERSION); + throw new TrinoException(GRAVITINO_UNSUPPORTED_TRINO_VERSION, errmsg); + } + + isCoordinator = context.getNodeManager().getCurrentNode().isCoordinator(); + } + + boolean isCoordinator() { + return isCoordinator; + } + + boolean isTrinoStarted() { + if (isStarted) { + return true; + } + + String command = "SELECT 1"; + try (Statement statement = connection.createStatement()) { + isStarted = statement.execute(command); + return isStarted; + } catch (Exception e) { + LOG.warn("Trino server is not started: {}", e.getMessage()); + return false; + } + } + + public void init(ConnectorContext context, GravitinoConfig config) throws Exception { + this.config = config; + checkTrinoSpiVersion(context); + + TrinoDriver driver = new TrinoDriver(); + DriverManager.registerDriver(driver); + + Properties properties = new Properties(); + properties.put("user", config.getTrinoUser()); + properties.put("password", config.getTrinoPassword()); + try { + connection = driver.connect(config.getTrinoURI(), properties); + } catch (SQLException e) { + throw new TrinoException( + GRAVITINO_RUNTIME_ERROR, "Failed to initialize the Trino connection.", e); + } + + catalogStoreDirectory = config.getCatalogStoreDirectory(); + if (!Files.exists(Path.of(catalogStoreDirectory))) { + throw new TrinoException( + GRAVITINO_MISSING_CONFIG, + String.format( + "Error config for Trino catalog store directory %s, file not found", + catalogStoreDirectory)); + } + } + + private String generateCreateCatalogCommand(String name, GravitinoCatalog gravitinoCatalog) + throws Exception { + return String.format( + "CREATE CATALOG %s USING gravitino WITH ( \"%s\" = 'true', \"%s\" = '%s', %s)", + name, + GRAVITINO_DYNAMIC_CONNECTOR, + GRAVITINO_DYNAMIC_CONNECTOR_CATALOG_CONFIG, + GravitinoCatalog.toJson(gravitinoCatalog), + config.toCatalogConfig()); + } + + private String generateDropCatalogCommand(String name) throws Exception { + return String.format("DROP CATALOG %s", name); + } + + public void registerCatalog(String name, GravitinoCatalog catalog) { + try { + String catalogFileName = String.format("%s/%s.properties", catalogStoreDirectory, name); + File catalogFile = new File(catalogFileName); + if (catalogFile.exists()) { + String catalogContents = Files.readString(catalogFile.toPath()); + if (!catalogContents.contains(GRAVITINO_DYNAMIC_CONNECTOR + "=true")) { + throw new TrinoException( + GRAVITINO_DUPLICATED_CATALOGS, + "Catalog already exists, the catalog is not created by gravitino"); + } else { + throw new TrinoException( + GRAVITINO_CATALOG_ALREADY_EXISTS, + String.format( + "Catalog %s in metalake %s already exists", + catalog.getName(), catalog.getMetalake())); + } + } + + if (checkCatalogExist(name)) { + throw new TrinoException( + GRAVITINO_DUPLICATED_CATALOGS, "Catalog already exists with unknown reason"); + } + String createCatalogCommand = generateCreateCatalogCommand(name, catalog); + executeSql(createCatalogCommand); + LOG.info("Register catalog {} successfully: {}", name, createCatalogCommand); + } catch (Exception e) { + String message = String.format("Failed to register catalog %s", name); + LOG.error(message); + throw new TrinoException(GRAVITINO_RUNTIME_ERROR, message, e); + } + } + + private boolean checkCatalogExist(String name) { + String showCatalogCommand = String.format("SHOW CATALOGS like '%s'", name); + Exception failedException = null; + try { + int retries = EXECUTE_QUERY_MAX_RETRIES; + while (retries-- > 0) { + try (Statement statement = connection.createStatement()) { + // check the catalog is already created + statement.execute(showCatalogCommand); + ResultSet rs = statement.getResultSet(); + while (rs.next()) { + String catalogName = rs.getString(1); + if (catalogName.equals(name) || catalogName.equals("\"" + name + "\"")) { + return true; + } + } + return false; + } catch (Exception e) { + failedException = e; + LOG.warn("Execute command failed: {}, ", showCatalogCommand, e); + Thread.sleep(EXECUTE_QUERY_BACKOFF_TIME_SECOND * 1000); + } + } + throw failedException; + } catch (Exception e) { + throw new TrinoException(GRAVITINO_RUNTIME_ERROR, "Failed to check catalog exist", e); + } + } + + private void executeSql(String sql) { + try { + int retries = EXECUTE_QUERY_MAX_RETRIES; + Exception failedException = null; + while (retries-- > 0) { + try (Statement statement = connection.createStatement()) { + // check the catalog is already created + statement.execute(sql); + return; + } catch (Exception e) { + failedException = e; + LOG.warn("Execute command failed: {}, ", sql, e); + Thread.sleep(EXECUTE_QUERY_BACKOFF_TIME_SECOND * 1000); + } + } + throw failedException; + } catch (Exception e) { + throw new TrinoException(GRAVITINO_RUNTIME_ERROR, "Failed to execute query: " + sql, e); + } + } + + public void unregisterCatalog(String name) { + try { + if (!checkCatalogExist(name)) { + LOG.warn("Catalog {} does not exist", name); + return; + } + String dropCatalogCommand = generateDropCatalogCommand(name); + executeSql(dropCatalogCommand); + LOG.info("Unregister catalog {} successfully: {}", name, dropCatalogCommand); + } catch (Exception e) { + String message = String.format("Failed to unregister catalog %s", name); + LOG.error(message); + throw new TrinoException(GRAVITINO_RUNTIME_ERROR, message, e); + } + } + + public void close() { + try { + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.error("Failed to close connection", e); + } + } +}