Skip to content

Commit

Permalink
[#5173] feat(hadoop-catlog): Support OSS fileset for Gravitino. (#5174)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Add OSS fileset support. 

### Why are the changes needed?

OSS is widely used in China and Asia. 

Fix: #5173 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

Test Locally.
  • Loading branch information
yuqi1129 authored Oct 18, 2024
1 parent 639dfc8 commit 437f367
Show file tree
Hide file tree
Showing 15 changed files with 500 additions and 37 deletions.
2 changes: 2 additions & 0 deletions LICENSE.bin
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@
ApacheDS I18n
ApacheDS Protocol Kerberos Codec
Apache Hadoop
Apache Hadoop Aliyun connector
Apache Hadoop GCS connector
Apache Hadoop Annotatations
Apache Hadoop Auth
Apache Hadoop Client Aggregator
Expand Down
5 changes: 3 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ tasks {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("authorization") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name != "trino-connector" &&
it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") && it.name != "gcp-bundle"
it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") && it.name != "gcp-bundle" && it.name != "aliyun-bundle"
) {
from(it.configurations.runtimeClasspath)
into("distribution/package/libs")
Expand All @@ -764,7 +764,8 @@ tasks {
!it.name.startsWith("integration-test") &&
!it.name.startsWith("flink") &&
!it.name.startsWith("trino-connector") &&
it.name != "hive-metastore-common" && it.name != "gcp-bundle"
it.name != "hive-metastore-common" && it.name != "gcp-bundle" &&
it.name != "aliyun-bundle"
) {
dependsOn("${it.name}:build")
from("${it.name}/build/libs")
Expand Down
58 changes: 58 additions & 0 deletions bundles/aliyun-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar

plugins {
`maven-publish`
id("java")
alias(libs.plugins.shadow)
}

dependencies {
compileOnly(project(":catalogs:catalog-hadoop"))
compileOnly(libs.hadoop3.common)
implementation(libs.hadoop3.oss)

// oss needs StringUtils from commons-lang or the following error will occur in 3.1.0
// java.lang.NoClassDefFoundError: org/apache/commons/lang/StringUtils
// org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystemStore.initialize(AliyunOSSFileSystemStore.java:111)
// org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.initialize(AliyunOSSFileSystem.java:323)
// org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3611)
implementation(libs.commons.lang)
}

tasks.withType(ShadowJar::class.java) {
isZip64 = true
configurations = listOf(project.configurations.runtimeClasspath.get())
archiveClassifier.set("")
mergeServiceFiles()

// Relocate dependencies to avoid conflicts
relocate("org.jdom", "org.apache.gravitino.shaded.org.jdom")
relocate("org.apache.commons.lang", "org.apache.gravitino.shaded.org.apache.commons.lang")
}

tasks.jar {
dependsOn(tasks.named("shadowJar"))
archiveClassifier.set("empty")
}

tasks.compileJava {
dependsOn(":catalogs:catalog-hadoop:runtimeJars")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.oss.fs;

import java.io.IOException;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;

public class OSSFileSystemProvider implements FileSystemProvider {
@Override
public FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
Configuration configuration = new Configuration();
config.forEach(
(k, v) -> {
configuration.set(k.replace("gravitino.bypass.", ""), v);
});

return AliyunOSSFileSystem.newInstance(path.toUri(), configuration);
}

@Override
public String scheme() {
return "oss";
}

@Override
public String name() {
return "oss";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.gravitino.oss.fs.OSSFileSystemProvider
6 changes: 6 additions & 0 deletions bundles/gcp-bundle/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ tasks.withType(ShadowJar::class.java) {
isZip64 = true
configurations = listOf(project.configurations.runtimeClasspath.get())
archiveClassifier.set("")

// Relocate dependencies to avoid conflicts
relocate("org.apache.httpcomponents", "org.apache.gravitino.shaded.org.apache.httpcomponents")
relocate("org.apache.commons", "org.apache.gravitino.shaded.org.apache.commons")
relocate("com.google.guava", "org.apache.gravitino.shaded.com.google.guava")
relocate("com.google.code", "org.apache.gravitino.shaded.com.google.code")
}

tasks.jar {
Expand Down
1 change: 1 addition & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ dependencies {
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(project(":bundles:gcp-bundle"))
testImplementation(project(":bundles:aliyun-bundle"))

testImplementation(libs.minikdc)
testImplementation(libs.hadoop3.minicluster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,21 @@ public class HadoopGCSCatalogIT extends HadoopCatalogIT {
public static final String BUCKET_NAME = "YOUR_BUCKET";
public static final String SERVICE_ACCOUNT_FILE = "YOUR_KEY_FILE";

@Override
public void startIntegrationTest() throws Exception {
// Do nothing.
}

@BeforeAll
public void setup() throws IOException {
copyBundleJarsToHadoop("gcp-bundle");

try {
super.startIntegrationTest();
} catch (Exception e) {
throw new RuntimeException(e);
}

metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.catalog.hadoop.integration.test;

import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Disabled(
"Disabled due to we don't have a real OSS account to test. If you have a GCP account,"
+ "please change the configuration(BUCKET_NAME, OSS_ACCESS_KEY, OSS_SECRET_KEY, OSS_ENDPOINT) and enable this test.")
public class HadoopOSSCatalogIT extends HadoopCatalogIT {
private static final Logger LOG = LoggerFactory.getLogger(HadoopOSSCatalogIT.class);
public static final String BUCKET_NAME = "YOUR_BUCKET";
public static final String OSS_ACCESS_KEY = "YOUR_OSS_ACCESS_KEY";
public static final String OSS_SECRET_KEY = "YOUR_OSS_SECRET_KEY";
public static final String OSS_ENDPOINT = "YOUR_OSS_ENDPOINT";

@VisibleForTesting
public void startIntegrationTest() throws Exception {}

@BeforeAll
public void setup() throws IOException {
copyBundleJarsToHadoop("aliyun-bundle");

try {
super.startIntegrationTest();
} catch (Exception e) {
throw new RuntimeException("Failed to start integration test", e);
}

metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake");
catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog");
schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema");

schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX);
Configuration conf = new Configuration();

conf.set("fs.oss.accessKeyId", OSS_ACCESS_KEY);
conf.set("fs.oss.accessKeySecret", OSS_SECRET_KEY);
conf.set("fs.oss.endpoint", OSS_ENDPOINT);
conf.set("fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
fileSystem = FileSystem.get(URI.create(String.format("oss://%s", BUCKET_NAME)), conf);

createMetalake();
createCatalog();
createSchema();
}

@AfterAll
public void stop() throws IOException {
Catalog catalog = metalake.loadCatalog(catalogName);
catalog.asSchemas().dropSchema(schemaName, true);
metalake.dropCatalog(catalogName);
client.dropMetalake(metalakeName);

try {
closer.close();
} catch (Exception e) {
LOG.error("Failed to close CloseableGroup", e);
}
}

protected String defaultBaseLocation() {
if (defaultBaseLocation == null) {
try {
Path bucket =
new Path(
String.format(
"oss://%s/%s",
BUCKET_NAME, GravitinoITUtils.genRandomName("CatalogFilesetIT")));
if (!fileSystem.exists(bucket)) {
fileSystem.mkdirs(bucket);
}

defaultBaseLocation = bucket.toString();
} catch (IOException e) {
throw new RuntimeException("Failed to create default base location", e);
}
}

return defaultBaseLocation;
}

protected void createCatalog() {
Map<String, String> map = Maps.newHashMap();
map.put("gravitino.bypass.fs.oss.accessKeyId", OSS_ACCESS_KEY);
map.put("gravitino.bypass.fs.oss.accessKeySecret", OSS_SECRET_KEY);
map.put("gravitino.bypass.fs.oss.endpoint", OSS_ENDPOINT);
map.put("gravitino.bypass.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
map.put(FILESYSTEM_PROVIDERS, "oss");

metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, "comment", map);

catalog = metalake.loadCatalog(catalogName);
}

protected String generateLocation(String filesetName) {
return String.format("%s/%s", defaultBaseLocation, filesetName);
}
}
1 change: 1 addition & 0 deletions clients/filesystem-hadoop3/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
testImplementation(project(":clients:client-java"))
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":bundles:gcp-bundle"))
testImplementation(project(":bundles:aliyun-bundle"))
testImplementation(libs.awaitility)
testImplementation(libs.bundles.jetty)
testImplementation(libs.bundles.jersey)
Expand Down
Loading

0 comments on commit 437f367

Please sign in to comment.