Skip to content

Commit

Permalink
[#3193] feat(spark-connector): Support Iceberg RestCatalog in spark-c…
Browse files Browse the repository at this point in the history
…onnector (#3194)

### What changes were proposed in this pull request?
Support Iceberg `RestCatalog` in spark-connector.
Add IT for Iceberg `RestCatalog` with HiveCatalog backend.

### Why are the changes needed?
without this, spark-connector does not support Iceberg `RestCatalog`

Fix: #3193

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
New ITs.
  • Loading branch information
caican00 authored and web-flow committed May 16, 2024
1 parent 3fa397a commit c98481d
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 4 deletions.
2 changes: 2 additions & 0 deletions integration-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ dependencies {
}
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
testImplementation("org.apache.kyuubi:kyuubi-spark-connector-hive_$scalaVersion:$kyuubiVersion")
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")

testImplementation(libs.okhttp3.loginterceptor)
testImplementation(libs.postgresql.driver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,19 @@

import com.datastrato.gravitino.Catalog;
import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.auxiliary.AuxiliaryServiceManager;
import com.datastrato.gravitino.client.GravitinoMetalake;
import com.datastrato.gravitino.integration.test.container.ContainerSuite;
import com.datastrato.gravitino.integration.test.container.HiveContainer;
import com.datastrato.gravitino.integration.test.util.AbstractIT;
import com.datastrato.gravitino.integration.test.util.spark.SparkUtilIT;
import com.datastrato.gravitino.server.web.JettyServerConfig;
import com.datastrato.gravitino.spark.connector.GravitinoSparkConfig;
import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.datastrato.gravitino.spark.connector.plugin.GravitinoSparkPlugin;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -30,9 +35,11 @@ public abstract class SparkEnvIT extends SparkUtilIT {
private static final Logger LOG = LoggerFactory.getLogger(SparkEnvIT.class);
private static final ContainerSuite containerSuite = ContainerSuite.getInstance();

protected static final String icebergRestServiceName = "iceberg-rest";
protected String hiveMetastoreUri = "thrift://127.0.0.1:9083";
protected String warehouse;
protected FileSystem hdfs;
protected String icebergRestServiceUri;

private final String metalakeName = "test";
private SparkSession sparkSession;
Expand All @@ -51,8 +58,15 @@ protected SparkSession getSparkSession() {
}

@BeforeAll
void startUp() {
void startUp() throws Exception {
initHiveEnv();
// initialize the hiveMetastoreUri and warehouse at first to inject properties to
// IcebergRestService
if ("lakehouse-iceberg".equalsIgnoreCase(getProvider())) {
initIcebergRestServiceEnv();
}
// Start Gravitino server
AbstractIT.startIntegrationTest();
initHdfsFileSystem();
initGravitinoEnv();
initMetalakeAndCatalogs();
Expand All @@ -64,7 +78,7 @@ void startUp() {
}

@AfterAll
void stop() {
void stop() throws IOException, InterruptedException {
if (hdfs != null) {
try {
hdfs.close();
Expand All @@ -75,8 +89,19 @@ void stop() {
if (sparkSession != null) {
sparkSession.close();
}
AbstractIT.stopIntegrationTest();
}

// AbstractIT#startIntegrationTest() is static, so we couldn't update the value of
// ignoreIcebergRestService
// if startIntegrationTest() is auto invoked by Junit. So here we override
// startIntegrationTest() to disable the auto invoke by junit.
@BeforeAll
public static void startIntegrationTest() {}

@AfterAll
public static void stopIntegrationTest() {}

private void initMetalakeAndCatalogs() {
client.createMetalake(NameIdentifier.of(metalakeName), "", Collections.emptyMap());
GravitinoMetalake metalake = client.loadMetalake(NameIdentifier.of(metalakeName));
Expand All @@ -93,6 +118,7 @@ private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT, just construct gravitinoUrl
int gravitinoPort = getGravitinoServerPort();
gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort);
icebergRestServiceUri = getIcebergRestServiceUri();
}

private void initHiveEnv() {
Expand All @@ -109,6 +135,30 @@ private void initHiveEnv() {
HiveContainer.HDFS_DEFAULTFS_PORT);
}

private void initIcebergRestServiceEnv() {
ignoreIcebergRestService = false;
Map<String, String> icebergRestServiceConfigs = new HashMap<>();
icebergRestServiceConfigs.put(
AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX
+ icebergRestServiceName
+ "."
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE);
icebergRestServiceConfigs.put(
AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX
+ icebergRestServiceName
+ "."
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
hiveMetastoreUri);
icebergRestServiceConfigs.put(
AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX
+ icebergRestServiceName
+ "."
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
warehouse);
AbstractIT.registerCustomConfigs(icebergRestServiceConfigs);
}

private void initHdfsFileSystem() {
Configuration conf = new Configuration();
conf.set(
Expand Down Expand Up @@ -139,4 +189,13 @@ private void initSparkEnv() {
.enableHiveSupport()
.getOrCreate();
}

private String getIcebergRestServiceUri() {
JettyServerConfig jettyServerConfig =
JettyServerConfig.fromConfig(
serverConfig,
AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX + icebergRestServiceName + ".");
return String.format(
"http://%s:%d/iceberg/", jettyServerConfig.getHost(), jettyServerConfig.getHttpPort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInstance;

/** This class use Iceberg HiveCatalog for backend catalog. */
@Tag("gravitino-docker-it")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SparkIcebergCatalogHiveBackendIT extends SparkIcebergCatalogIT {

@Override
protected Map<String, String> getCatalogConfigs() {
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND, "hive");
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE);
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse);
catalogProperties.put(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.integration.test.spark.iceberg;

import com.datastrato.gravitino.spark.connector.iceberg.IcebergPropertiesConstants;
import com.google.common.collect.Maps;
import java.util.Map;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInstance;

/** This class use Iceberg RESTCatalog for test, and the real backend catalog is HiveCatalog. */
@Tag("gravitino-docker-it")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SparkIcebergCatalogRestBackendIT extends SparkIcebergCatalogIT {

@Override
protected Map<String, String> getCatalogConfigs() {
Map<String, String> catalogProperties = Maps.newHashMap();
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST);
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, icebergRestServiceUri);
catalogProperties.put(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse);

return catalogProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,18 @@ public class IcebergPropertiesConstants {
static final String ICEBERG_CATALOG_JDBC_PASSWORD =
IcebergCatalogPropertiesMetadata.ICEBERG_JDBC_PASSWORD;

@VisibleForTesting
public static final String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;

static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_HIVE = "hive";
static final String ICEBERG_CATALOG_BACKEND_HIVE = CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE;

static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC = "jdbc";
static final String ICEBERG_CATALOG_BACKEND_JDBC = "jdbc";

@VisibleForTesting
public static final String ICEBERG_CATALOG_BACKEND_REST = CatalogUtil.ICEBERG_CATALOG_TYPE_REST;

static final String GRAVITINO_ICEBERG_CATALOG_BACKEND_REST = "rest";

private IcebergPropertiesConstants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public Map<String, String> toSparkCatalogProperties(Map<String, String> properti
case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_JDBC:
initJdbcProperties(properties, all);
break;
case IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND_REST:
initRestProperties(properties, all);
break;
default:
// SparkCatalog does not support Memory type catalog
throw new IllegalArgumentException(
Expand Down Expand Up @@ -123,4 +126,25 @@ private void initJdbcProperties(
icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_USER, jdbcUser);
icebergProperties.put(IcebergPropertiesConstants.ICEBERG_CATALOG_JDBC_PASSWORD, jdbcPassword);
}

private void initRestProperties(
Map<String, String> gravitinoProperties, HashMap<String, String> icebergProperties) {
String restUri =
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI);
Preconditions.checkArgument(
StringUtils.isNotBlank(restUri),
"Couldn't get "
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI
+ " from Iceberg Catalog properties");
icebergProperties.put(
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST);
icebergProperties.put(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, restUri);
if (gravitinoProperties.containsKey(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE)) {
icebergProperties.put(
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
gravitinoProperties.get(IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,28 @@ void testCatalogPropertiesWithJdbcBackend() {
"passwd"),
properties);
}

@Test
void testCatalogPropertiesWithRestBackend() {
Map<String, String> properties =
icebergPropertiesConverter.toSparkCatalogProperties(
ImmutableMap.of(
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
"rest-uri",
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
"rest-warehouse",
"key1",
"value1"));
Assertions.assertEquals(
ImmutableMap.of(
IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE,
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST,
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"rest-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
"rest-warehouse"),
properties);
}
}

0 comments on commit c98481d

Please sign in to comment.