Skip to content

Commit

Permalink
[#5074] feat(hadoop-catalog): Support GCS fileset. (#5079)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

1.  Add a bundled jar for Hadoop GCS jar.
2. Support GCS in Hadoop catalog.

### Why are the changes needed?

Users highly demand Fileset for GCS storage.

Fix: #5074 

### Does this PR introduce _any_ user-facing change?

N/A

### How was this patch tested?

Manually, please see: HadoopGCPCatalogIT
  • Loading branch information
yuqi1129 authored Oct 17, 2024
1 parent 075a851 commit 93cdbc2
Show file tree
Hide file tree
Showing 17 changed files with 521 additions and 67 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ tasks {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("authorization") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name != "trino-connector" &&
it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink")
it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") && it.name != "gcp-bundle"
) {
from(it.configurations.runtimeClasspath)
into("distribution/package/libs")
Expand All @@ -764,7 +764,7 @@ tasks {
!it.name.startsWith("integration-test") &&
!it.name.startsWith("flink") &&
!it.name.startsWith("trino-connector") &&
it.name != "hive-metastore-common"
it.name != "hive-metastore-common" && it.name != "gcp-bundle"
) {
dependsOn("${it.name}:build")
from("${it.name}/build/libs")
Expand Down
22 changes: 22 additions & 0 deletions bundles/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

tasks.all {
enabled = false
}
46 changes: 46 additions & 0 deletions bundles/gcp-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
`maven-publish`
id("java")
alias(libs.plugins.shadow)
}

dependencies {
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(libs.hadoop3.common)
implementation(libs.hadoop3.gcs)
}

tasks.withType(ShadowJar::class.java) {
isZip64 = true
configurations = listOf(project.configurations.runtimeClasspath.get())
archiveClassifier.set("")
}

tasks.jar {
dependsOn(tasks.named("shadowJar"))
archiveClassifier.set("empty")
}

tasks.compileJava {
dependsOn(":catalogs:catalog-hadoop:runtimeJars")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.gcs.fs;

import com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem;
import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class GCSFileSystemProvider implements FileSystemProvider {
@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});

return GoogleHadoopFileSystem.newInstance(path.toUri(), configuration);
}

@Override
public String scheme() {
return "gs";
}

@Override
public String name() {
return "gcs";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.gravitino.gcs.fs.GCSFileSystemProvider
2 changes: 2 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ dependencies {
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(project(":bundles:gcp-bundle"))

testImplementation(libs.minikdc)
testImplementation(libs.hadoop3.minicluster)
Expand All @@ -86,6 +87,7 @@ dependencies {
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.hadoop3.gcs)

testRuntimeOnly(libs.junit.jupiter.engine)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;

public class HDFSFileSystemProvider implements FileSystemProvider {

Expand All @@ -39,7 +38,7 @@ public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, String>
(k, v) -> {
configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
});
return DistributedFileSystem.newInstance(path.toUri(), configuration);
return FileSystem.newInstance(path.toUri(), configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;

public class LocalFileSystemProvider implements FileSystemProvider {
Expand All @@ -38,7 +37,7 @@ public FileSystem getFileSystem(Path path, Map<String, String> config) throws IO
configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v);
});

return LocalFileSystem.newInstance(path.toUri(), configuration);
return FileSystem.newInstance(path.toUri(), configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,24 @@
@Tag("gravitino-docker-test")
public class HadoopCatalogIT extends BaseIT {
private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogIT.class);
private static final ContainerSuite containerSuite = ContainerSuite.getInstance();
protected static final ContainerSuite containerSuite = ContainerSuite.getInstance();

public static final String metalakeName =
GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
public static final String catalogName =
GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
public String metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
public String catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
public static final String SCHEMA_PREFIX = "CatalogFilesetIT_schema";
public static final String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
private static final String provider = "hadoop";
private static GravitinoMetalake metalake;
private static Catalog catalog;
private static FileSystem hdfs;
private static String defaultBaseLocation;
public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
protected static final String provider = "hadoop";
protected static GravitinoMetalake metalake;
protected static Catalog catalog;
protected static FileSystem fileSystem;
protected static String defaultBaseLocation;

@BeforeAll
public void setup() throws IOException {
containerSuite.startHiveContainer();
Configuration conf = new Configuration();
conf.set("fs.defaultFS", defaultBaseLocation());
hdfs = FileSystem.get(conf);
fileSystem = FileSystem.get(conf);

createMetalake();
createCatalog();
Expand All @@ -91,8 +89,8 @@ public void stop() throws IOException {
catalog.asSchemas().dropSchema(schemaName, true);
metalake.dropCatalog(catalogName);
client.dropMetalake(metalakeName);
if (hdfs != null) {
hdfs.close();
if (fileSystem != null) {
fileSystem.close();
}

try {
Expand All @@ -102,7 +100,7 @@ public void stop() throws IOException {
}
}

private void createMetalake() {
protected void createMetalake() {
GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes();
Assertions.assertEquals(0, gravitinoMetalakes.length);

Expand All @@ -114,14 +112,14 @@ private void createMetalake() {
metalake = loadMetalake;
}

private void createCatalog() {
protected void createCatalog() {
metalake.createCatalog(
catalogName, Catalog.Type.FILESET, provider, "comment", ImmutableMap.of());

catalog = metalake.loadCatalog(catalogName);
}

private void createSchema() {
protected void createSchema() {
Map<String, String> properties = Maps.newHashMap();
properties.put("key1", "val1");
properties.put("key2", "val2");
Expand All @@ -137,7 +135,7 @@ private void createSchema() {
Assertions.assertNotNull(loadSchema.properties().get("location"));
}

private static void dropSchema() {
private void dropSchema() {
catalog.asSchemas().dropSchema(schemaName, true);
Assertions.assertFalse(catalog.asSchemas().schemaExists(schemaName));
}
Expand Down Expand Up @@ -171,7 +169,7 @@ public void testCreateFileset() throws IOException {
String filesetName = "test_create_fileset";
String storageLocation = storageLocation(filesetName);
Assertions.assertFalse(
hdfs.exists(new Path(storageLocation)), "storage location should not exists");
fileSystem.exists(new Path(storageLocation)), "storage location should not exists");
Fileset fileset =
createFileset(
filesetName,
Expand Down Expand Up @@ -242,7 +240,7 @@ public void testCreateFilesetWithChinese() throws IOException {
String filesetName = "test_create_fileset_with_chinese";
String storageLocation = storageLocation(filesetName) + "/中文目录test";
Assertions.assertFalse(
hdfs.exists(new Path(storageLocation)), "storage location should not exists");
fileSystem.exists(new Path(storageLocation)), "storage location should not exists");
Fileset fileset =
createFileset(
filesetName,
Expand Down Expand Up @@ -285,7 +283,7 @@ public void testExternalFileset() throws IOException {
Assertions.assertEquals(1, fileset.properties().size());
Assertions.assertEquals("v1", fileset.properties().get("k1"));
Assertions.assertTrue(
hdfs.exists(new Path(storageLocation)), "storage location should be created");
fileSystem.exists(new Path(storageLocation)), "storage location should be created");

// create fileset with storage location that not exist
String filesetName2 = "test_external_fileset_no_exist";
Expand Down Expand Up @@ -349,7 +347,7 @@ public void testDropManagedFileset() throws IOException {
String storageLocation = storageLocation(filesetName);

Assertions.assertFalse(
hdfs.exists(new Path(storageLocation)), "storage location should not exists");
fileSystem.exists(new Path(storageLocation)), "storage location should not exists");

createFileset(
filesetName, "comment", Fileset.Type.MANAGED, storageLocation, ImmutableMap.of("k1", "v1"));
Expand All @@ -365,7 +363,7 @@ public void testDropManagedFileset() throws IOException {
catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, filesetName)),
"fileset should not be exists");
Assertions.assertFalse(
hdfs.exists(new Path(storageLocation)), "storage location should be dropped");
fileSystem.exists(new Path(storageLocation)), "storage location should be dropped");
}

@Test
Expand All @@ -392,7 +390,7 @@ public void testDropExternalFileset() throws IOException {
catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, filesetName)),
"fileset should not be exists");
Assertions.assertTrue(
hdfs.exists(new Path(storageLocation)), "storage location should not be dropped");
fileSystem.exists(new Path(storageLocation)), "storage location should not be dropped");
}

@Test
Expand Down Expand Up @@ -688,7 +686,7 @@ public void testGetFileLocationWithInvalidAuditHeaders() {
}
}

private static String generateLocation(String filesetName) {
protected String generateLocation(String filesetName) {
return String.format(
"hdfs://%s:%d/user/hadoop/%s/%s/%s",
containerSuite.getHiveContainer().getContainerIpAddress(),
Expand All @@ -707,7 +705,7 @@ private Fileset createFileset(
if (storageLocation != null) {
Path location = new Path(storageLocation);
try {
hdfs.deleteOnExit(location);
fileSystem.deleteOnExit(location);
} catch (IOException e) {
LOG.warn("Failed to delete location: {}", location, e);
}
Expand All @@ -724,10 +722,11 @@ private void assertFilesetExists(String filesetName) throws IOException {
catalog.asFilesetCatalog().filesetExists(NameIdentifier.of(schemaName, filesetName)),
"fileset should be exists");
Assertions.assertTrue(
hdfs.exists(new Path(storageLocation(filesetName))), "storage location should be exists");
fileSystem.exists(new Path(storageLocation(filesetName))),
"storage location should be exists");
}

private static String defaultBaseLocation() {
protected String defaultBaseLocation() {
if (defaultBaseLocation == null) {
defaultBaseLocation =
String.format(
Expand All @@ -739,7 +738,7 @@ private static String defaultBaseLocation() {
return defaultBaseLocation;
}

private static String storageLocation(String filesetName) {
private String storageLocation(String filesetName) {
return defaultBaseLocation() + "/" + filesetName;
}
}
Loading

0 comments on commit 93cdbc2

Please sign in to comment.