From f69bdaf155ac82fdccbc443fb8d07ffd5d8b81e7 Mon Sep 17 00:00:00 2001 From: XiaoZ <57973980+xiaozcy@users.noreply.github.com> Date: Mon, 21 Oct 2024 19:15:09 +0800 Subject: [PATCH] [#3379] feat(catalog-hadoop): Add S3 support for Fileset Hadoop catalog (#4232) ### What changes were proposed in this pull request? Add S3 support for Fileset Hadoop catalog. We only add hadoop-aws dependency actually, most of the work is conducting tests. ### Why are the changes needed? Fix: #3379 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? IT. --------- Co-authored-by: zhanghan18 Co-authored-by: yuqi --- LICENSE.bin | 1 + build.gradle.kts | 7 +- bundles/aws-bundle/build.gradle.kts | 46 ++++ .../gravitino/s3/fs/S3FileSystemProvider.java | 51 +++++ ...itino.catalog.hadoop.fs.FileSystemProvider | 20 ++ catalogs/catalog-hadoop/build.gradle.kts | 6 + .../integration/test/HadoopCatalogIT.java | 23 +- .../integration/test/HadoopS3CatalogIT.java | 189 ++++++++++++++++ catalogs/catalog-hive/build.gradle.kts | 2 +- clients/filesystem-hadoop3/build.gradle.kts | 6 + .../test/GravitinoVirtualFileSystemIT.java | 5 +- .../test/GravitinoVirtualFileSystemOSSIT.java | 36 +--- .../test/GravitinoVirtualFileSystemS3IT.java | 203 ++++++++++++++++++ gradle/libs.versions.toml | 3 +- .../integration/test/util/HttpUtils.java | 2 +- settings.gradle.kts | 1 + 16 files changed, 553 insertions(+), 48 deletions(-) create mode 100644 bundles/aws-bundle/build.gradle.kts create mode 100644 bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java create mode 100644 bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider create mode 100644 catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java create mode 100644 clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java diff --git a/LICENSE.bin b/LICENSE.bin index ee65d4d6952..e922f936771 100644 --- a/LICENSE.bin +++ b/LICENSE.bin @@ -284,6 +284,7 @@ Apache Hadoop Apache Hadoop Aliyun connector Apache Hadoop GCS connector + Apache Hadoop AWS connector Apache Hadoop Annotatations Apache Hadoop Auth Apache Hadoop Client Aggregator diff --git a/build.gradle.kts b/build.gradle.kts index 61ce878575f..b954aaf10e7 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -746,7 +746,9 @@ tasks { if (!it.name.startsWith("catalog") && !it.name.startsWith("authorization") && !it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark") && !it.name.startsWith("iceberg") && it.name != "trino-connector" && - it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") && it.name != "gcp-bundle" && it.name != "aliyun-bundle" + it.name != "integration-test" && it.name != "bundled-catalog" && !it.name.startsWith("flink") && + it.name != "integration-test" && it.name != "hive-metastore-common" && !it.name.startsWith("flink") && + it.name != "gcp-bundle" && it.name != "aliyun-bundle" && it.name != "aws-bundle" ) { from(it.configurations.runtimeClasspath) into("distribution/package/libs") @@ -765,8 +767,9 @@ tasks { !it.name.startsWith("integration-test") && !it.name.startsWith("flink") && !it.name.startsWith("trino-connector") && + it.name != "bundled-catalog" && it.name != "hive-metastore-common" && it.name != "gcp-bundle" && - it.name != "aliyun-bundle" + it.name != "aliyun-bundle" && it.name != "aws-bundle" ) { dependsOn("${it.name}:build") from("${it.name}/build/libs") diff --git a/bundles/aws-bundle/build.gradle.kts b/bundles/aws-bundle/build.gradle.kts new file mode 100644 index 00000000000..741bdc414e1 --- /dev/null +++ b/bundles/aws-bundle/build.gradle.kts @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar + +plugins { + `maven-publish` + id("java") + alias(libs.plugins.shadow) +} + +dependencies { + compileOnly(project(":catalogs:catalog-hadoop")) + compileOnly(libs.hadoop3.common) + implementation(libs.hadoop3.aws) +} + +tasks.withType(ShadowJar::class.java) { + isZip64 = true + configurations = listOf(project.configurations.runtimeClasspath.get()) + archiveClassifier.set("") +} + +tasks.jar { + dependsOn(tasks.named("shadowJar")) + archiveClassifier.set("empty") +} + +tasks.compileJava { + dependsOn(":catalogs:catalog-hadoop:runtimeJars") +} diff --git a/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java new file mode 100644 index 00000000000..4ab1ca24212 --- /dev/null +++ b/bundles/aws-bundle/src/main/java/org/apache/gravitino/s3/fs/S3FileSystemProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.s3.fs; + +import java.io.IOException; +import java.util.Map; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.S3AFileSystem; + +public class S3FileSystemProvider implements FileSystemProvider { + @Override + public FileSystem getFileSystem(Path path, Map config) throws IOException { + Configuration configuration = new Configuration(); + config.forEach( + (k, v) -> { + configuration.set(k.replace("gravitino.bypass.", ""), v); + }); + + return S3AFileSystem.newInstance(path.toUri(), configuration); + } + + @Override + public String scheme() { + return "s3a"; + } + + @Override + public String name() { + return "s3"; + } +} diff --git a/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider b/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider new file mode 100644 index 00000000000..37a1a84c7ee --- /dev/null +++ b/bundles/aws-bundle/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.gravitino.s3.fs.S3FileSystemProvider \ No newline at end of file diff --git a/catalogs/catalog-hadoop/build.gradle.kts b/catalogs/catalog-hadoop/build.gradle.kts index 4c091b14946..62a48656c39 100644 --- a/catalogs/catalog-hadoop/build.gradle.kts +++ b/catalogs/catalog-hadoop/build.gradle.kts @@ -73,6 +73,7 @@ dependencies { testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation(project(":server")) testImplementation(project(":server-common")) + testImplementation(project(":bundles:aws-bundle")) testImplementation(project(":bundles:gcp-bundle")) testImplementation(project(":bundles:aliyun-bundle")) @@ -161,6 +162,11 @@ tasks.test { } else { dependsOn(tasks.jar) } + + // this task depends on :bundles:aws-bundle:jar + dependsOn(":bundles:aws-bundle:jar") + dependsOn(":bundles:aliyun-bundle:jar") + dependsOn(":bundles:gcp-bundle:jar") } tasks.getByName("generateMetadataFileForMavenJavaPublication") { diff --git a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java index 49bd29b2ee7..b272bd7a889 100644 --- a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java +++ b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java @@ -61,19 +61,24 @@ public class HadoopCatalogIT extends BaseIT { private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogIT.class); protected static final ContainerSuite containerSuite = ContainerSuite.getInstance(); - public String metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake"); - public String catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog"); - public static final String SCHEMA_PREFIX = "CatalogFilesetIT_schema"; - public String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX); + protected String metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake"); + protected String catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog"); + public final String SCHEMA_PREFIX = "CatalogFilesetIT_schema"; + protected String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX); protected static final String provider = "hadoop"; - protected static GravitinoMetalake metalake; - protected static Catalog catalog; - protected static FileSystem fileSystem; - protected static String defaultBaseLocation; + protected GravitinoMetalake metalake; + protected Catalog catalog; + protected FileSystem fileSystem; + protected String defaultBaseLocation; + + protected void startNecessaryContainer() { + containerSuite.startHiveContainer(); + } @BeforeAll public void setup() throws IOException { - containerSuite.startHiveContainer(); + startNecessaryContainer(); + Configuration conf = new Configuration(); conf.set("fs.defaultFS", defaultBaseLocation()); fileSystem = FileSystem.get(conf); diff --git a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java new file mode 100644 index 00000000000..bac39b7b83c --- /dev/null +++ b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/integration/test/HadoopS3CatalogIT.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hadoop.integration.test; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +@Tag("gravitino-docker-test") +public class HadoopS3CatalogIT extends HadoopCatalogIT { + private static final Logger LOG = LoggerFactory.getLogger(HadoopOSSCatalogIT.class); + private String bucketName = "s3-bucket-" + UUID.randomUUID().toString().replace("-", ""); + private String accessKey; + private String secretKey; + private String s3Endpoint; + + private GravitinoLocalStackContainer gravitinoLocalStackContainer; + + @VisibleForTesting + public void startIntegrationTest() throws Exception {} + + @Override + protected void startNecessaryContainer() { + + containerSuite.startLocalStackContainer(); + gravitinoLocalStackContainer = containerSuite.getLocalStackContainer(); + + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until( + () -> { + try { + Container.ExecResult result = + gravitinoLocalStackContainer.executeInContainer( + "awslocal", "iam", "create-user", "--user-name", "anonymous"); + return result.getExitCode() == 0; + } catch (Exception e) { + LOG.info("LocalStack is not ready yet for: ", e); + return false; + } + }); + + gravitinoLocalStackContainer.executeInContainer("awslocal", "s3", "mb", "s3://" + bucketName); + + Container.ExecResult result = + gravitinoLocalStackContainer.executeInContainer( + "awslocal", "iam", "create-access-key", "--user-name", "anonymous"); + + gravitinoLocalStackContainer.executeInContainer( + "awslocal", + "s3api", + "put-bucket-acl", + "--bucket", + "my-test-bucket", + "--acl", + "public-read-write"); + + // Get access key and secret key from result + String[] lines = result.getStdout().split("\n"); + accessKey = lines[3].split(":")[1].trim().substring(1, 21); + secretKey = lines[5].split(":")[1].trim().substring(1, 41); + + LOG.info("Access key: " + accessKey); + LOG.info("Secret key: " + secretKey); + + s3Endpoint = + String.format("http://%s:%d", gravitinoLocalStackContainer.getContainerIpAddress(), 4566); + } + + @BeforeAll + public void setup() throws IOException { + copyBundleJarsToHadoop("aws-bundle"); + + try { + super.startIntegrationTest(); + } catch (Exception e) { + throw new RuntimeException("Failed to start integration test", e); + } + + startNecessaryContainer(); + + metalakeName = GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake"); + catalogName = GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog"); + schemaName = GravitinoITUtils.genRandomName("CatalogFilesetIT_schema"); + + schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX); + Configuration conf = new Configuration(); + + conf.set("fs.s3a.access.key", accessKey); + conf.set("fs.s3a.secret.key", secretKey); + conf.set("fs.s3a.endpoint", s3Endpoint); + conf.set( + "fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); + fileSystem = FileSystem.get(URI.create(String.format("s3a://%s", bucketName)), conf); + + createMetalake(); + createCatalog(); + createSchema(); + } + + @AfterAll + public void stop() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName); + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Failed to close CloseableGroup", e); + } + } + + protected String defaultBaseLocation() { + if (defaultBaseLocation == null) { + try { + Path bucket = + new Path( + String.format( + "s3a://%s/%s", bucketName, GravitinoITUtils.genRandomName("CatalogFilesetIT"))); + if (!fileSystem.exists(bucket)) { + fileSystem.mkdirs(bucket); + } + + defaultBaseLocation = bucket.toString(); + } catch (IOException e) { + throw new RuntimeException("Failed to create default base location", e); + } + } + + return defaultBaseLocation; + } + + protected void createCatalog() { + Map map = Maps.newHashMap(); + map.put("gravitino.bypass.fs.s3a.access.key", accessKey); + map.put("gravitino.bypass.fs.s3a.secret.key", secretKey); + map.put("gravitino.bypass.fs.s3a.endpoint", s3Endpoint); + map.put( + "gravitino.bypass.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); + map.put(FILESYSTEM_PROVIDERS, "s3"); + + metalake.createCatalog(catalogName, Catalog.Type.FILESET, provider, "comment", map); + + catalog = metalake.loadCatalog(catalogName); + } + + protected String generateLocation(String filesetName) { + return String.format("%s/%s", defaultBaseLocation, filesetName); + } +} diff --git a/catalogs/catalog-hive/build.gradle.kts b/catalogs/catalog-hive/build.gradle.kts index aca8959df13..f7d6e60c14d 100644 --- a/catalogs/catalog-hive/build.gradle.kts +++ b/catalogs/catalog-hive/build.gradle.kts @@ -128,7 +128,7 @@ dependencies { testImplementation(libs.testcontainers) testImplementation(libs.testcontainers.mysql) testImplementation(libs.testcontainers.localstack) - testImplementation(libs.hadoop2.s3) + testImplementation(libs.hadoop2.aws) testRuntimeOnly(libs.junit.jupiter.engine) } diff --git a/clients/filesystem-hadoop3/build.gradle.kts b/clients/filesystem-hadoop3/build.gradle.kts index c3f8c6d7bcf..7f21c700d6e 100644 --- a/clients/filesystem-hadoop3/build.gradle.kts +++ b/clients/filesystem-hadoop3/build.gradle.kts @@ -41,6 +41,7 @@ dependencies { testImplementation(project(":integration-test-common", "testArtifacts")) testImplementation(project(":bundles:gcp-bundle")) testImplementation(project(":bundles:aliyun-bundle")) + testImplementation(project(":bundles:aws-bundle")) testImplementation(libs.awaitility) testImplementation(libs.bundles.jetty) testImplementation(libs.bundles.jersey) @@ -89,6 +90,11 @@ tasks.test { } else { dependsOn(":catalogs:catalog-hadoop:jar", ":catalogs:catalog-hadoop:runtimeJars") } + + // this task depends on :bundles:aws-bundle:shadowJar + dependsOn(":bundles:aws-bundle:jar") + dependsOn(":bundles:aliyun-bundle:jar") + dependsOn(":bundles:gcp-bundle:jar") } tasks.javadoc { diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java index 064643b7931..b971ab918d2 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemIT.java @@ -56,13 +56,14 @@ @Tag("gravitino-docker-test") public class GravitinoVirtualFileSystemIT extends BaseIT { private static final Logger LOG = LoggerFactory.getLogger(GravitinoVirtualFileSystemIT.class); - private static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + protected static final ContainerSuite containerSuite = ContainerSuite.getInstance(); protected String metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); protected String catalogName = GravitinoITUtils.genRandomName("catalog"); protected String schemaName = GravitinoITUtils.genRandomName("schema"); protected GravitinoMetalake metalake; protected Configuration conf = new Configuration(); protected int defaultBockSize = 128 * 1024 * 1024; + protected int defaultReplication = 3; @BeforeAll public void startUp() throws Exception { @@ -459,7 +460,7 @@ public void testGetDefaultReplications() throws IOException { Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent)); Path gvfsPath = genGvfsPath(filesetName); try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) { - assertEquals(3, gvfs.getDefaultReplication(gvfsPath)); + assertEquals(defaultReplication, gvfs.getDefaultReplication(gvfsPath)); } catalog.asFilesetCatalog().dropFileset(filesetIdent); diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java index 67c76be3db5..6a6557c6c55 100644 --- a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemOSSIT.java @@ -21,25 +21,18 @@ import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; import static org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS; -import static org.junit.jupiter.api.Assertions.assertEquals; import com.google.common.collect.Maps; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import org.apache.gravitino.Catalog; -import org.apache.gravitino.NameIdentifier; -import org.apache.gravitino.file.Fileset; import org.apache.gravitino.integration.test.util.GravitinoITUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +61,9 @@ public void startUp() throws Exception { // This value can be by tune by the user, please change it accordingly. defaultBockSize = 64 * 1024 * 1024; + // The default replication factor is 1. + defaultReplication = 1; + metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); catalogName = GravitinoITUtils.genRandomName("catalog"); schemaName = GravitinoITUtils.genRandomName("schema"); @@ -111,7 +107,7 @@ public void startUp() throws Exception { public void tearDown() throws IOException { Catalog catalog = metalake.loadCatalog(catalogName); catalog.asSchemas().dropSchema(schemaName, true); - metalake.dropCatalog(catalogName); + metalake.dropCatalog(catalogName, true); client.dropMetalake(metalakeName); if (client != null) { @@ -145,30 +141,6 @@ protected String genStorageLocation(String fileset) { return String.format("oss://%s/%s", BUCKET_NAME, fileset); } - @Test - public void testGetDefaultReplications() throws IOException { - String filesetName = GravitinoITUtils.genRandomName("test_get_default_replications"); - NameIdentifier filesetIdent = NameIdentifier.of(schemaName, filesetName); - Catalog catalog = metalake.loadCatalog(catalogName); - String storageLocation = genStorageLocation(filesetName); - catalog - .asFilesetCatalog() - .createFileset( - filesetIdent, - "fileset comment", - Fileset.Type.MANAGED, - storageLocation, - new HashMap<>()); - Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(filesetIdent)); - Path gvfsPath = genGvfsPath(filesetName); - try (FileSystem gvfs = gvfsPath.getFileSystem(conf)) { - // Here HDFS is 3, but for oss is 1. - assertEquals(1, gvfs.getDefaultReplication(gvfsPath)); - } - - catalog.asFilesetCatalog().dropFileset(filesetIdent); - } - @Disabled( "OSS does not support append, java.io.IOException: The append operation is not supported") public void testAppend() throws IOException {} diff --git a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java new file mode 100644 index 00000000000..22951da3ad9 --- /dev/null +++ b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GravitinoVirtualFileSystemS3IT.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.filesystem.hadoop.integration.test; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS; +import static org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS; + +import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.integration.test.container.GravitinoLocalStackContainer; +import org.apache.gravitino.integration.test.util.GravitinoITUtils; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Container; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +public class GravitinoVirtualFileSystemS3IT extends GravitinoVirtualFileSystemIT { + private static final Logger LOG = LoggerFactory.getLogger(GravitinoVirtualFileSystemS3IT.class); + + private String bucketName = "s3-bucket-" + UUID.randomUUID().toString().replace("-", ""); + private String accessKey; + private String secretKey; + private String s3Endpoint; + + private GravitinoLocalStackContainer gravitinoLocalStackContainer; + + @BeforeAll + public void startIntegrationTest() { + // Do nothing + } + + private void startS3Mocker() { + containerSuite.startLocalStackContainer(); + gravitinoLocalStackContainer = containerSuite.getLocalStackContainer(); + + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS) + .until( + () -> { + try { + Container.ExecResult result = + gravitinoLocalStackContainer.executeInContainer( + "awslocal", "iam", "create-user", "--user-name", "anonymous"); + return result.getExitCode() == 0; + } catch (Exception e) { + LOG.info("LocalStack is not ready yet for: ", e); + return false; + } + }); + + gravitinoLocalStackContainer.executeInContainer("awslocal", "s3", "mb", "s3://" + bucketName); + + Container.ExecResult result = + gravitinoLocalStackContainer.executeInContainer( + "awslocal", "iam", "create-access-key", "--user-name", "anonymous"); + + gravitinoLocalStackContainer.executeInContainer( + "awslocal", + "s3api", + "put-bucket-acl", + "--bucket", + "my-test-bucket", + "--acl", + "public-read-write"); + + // Get access key and secret key from result + String[] lines = result.getStdout().split("\n"); + accessKey = lines[3].split(":")[1].trim().substring(1, 21); + secretKey = lines[5].split(":")[1].trim().substring(1, 41); + + LOG.info("Access key: " + accessKey); + LOG.info("Secret key: " + secretKey); + + s3Endpoint = + String.format("http://%s:%d", gravitinoLocalStackContainer.getContainerIpAddress(), 4566); + } + + @BeforeAll + public void startUp() throws Exception { + copyBundleJarsToHadoop("aws-bundle"); + + // Start s3 simulator + startS3Mocker(); + + // Need to download jars to gravitino server + super.startIntegrationTest(); + + // This value can be by tune by the user, please change it accordingly. + defaultBockSize = 32 * 1024 * 1024; + + // The value is 1 for S3 + defaultReplication = 1; + + metalakeName = GravitinoITUtils.genRandomName("gvfs_it_metalake"); + catalogName = GravitinoITUtils.genRandomName("catalog"); + schemaName = GravitinoITUtils.genRandomName("schema"); + + Assertions.assertFalse(client.metalakeExists(metalakeName)); + metalake = client.createMetalake(metalakeName, "metalake comment", Collections.emptyMap()); + Assertions.assertTrue(client.metalakeExists(metalakeName)); + + Map properties = Maps.newHashMap(); + properties.put("gravitino.bypass.fs.s3a.access.key", accessKey); + properties.put("gravitino.bypass.fs.s3a.secret.key", secretKey); + properties.put("gravitino.bypass.fs.s3a.endpoint", s3Endpoint); + properties.put( + "gravitino.bypass.fs.s3a.aws.credentials.provider", + "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); + properties.put(FILESYSTEM_PROVIDERS, "s3"); + + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + + catalog.asSchemas().createSchema(schemaName, "schema comment", properties); + Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName)); + + conf.set("fs.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem"); + conf.set("fs.AbstractFileSystem.gvfs.impl", "org.apache.gravitino.filesystem.hadoop.Gvfs"); + conf.set("fs.gvfs.impl.disable.cache", "true"); + conf.set("fs.gravitino.server.uri", serverUri); + conf.set("fs.gravitino.client.metalake", metalakeName); + + // Pass this configuration to the real file system + conf.set("fs.s3a.access.key", accessKey); + conf.set("fs.s3a.secret.key", secretKey); + conf.set("fs.s3a.endpoint", s3Endpoint); + conf.set( + "fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); + conf.set(FS_FILESYSTEM_PROVIDERS, "s3"); + } + + @AfterAll + public void tearDown() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName, true); + client.dropMetalake(metalakeName); + + if (client != null) { + client.close(); + client = null; + } + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Exception in closing CloseableGroup", e); + } + } + + /** + * Remove the `gravitino.bypass` prefix from the configuration and pass it to the real file system + * This method corresponds to the method org.apache.gravitino.filesystem.hadoop + * .GravitinoVirtualFileSystem#getConfigMap(Configuration) in the original code. + */ + protected Configuration convertGvfsConfigToRealFileSystemConfig(Configuration gvfsConf) { + Configuration gcsConf = new Configuration(); + gvfsConf.forEach( + entry -> { + gcsConf.set(entry.getKey().replace("gravitino.bypass.", ""), entry.getValue()); + }); + + return gcsConf; + } + + protected String genStorageLocation(String fileset) { + return String.format("s3a://%s/%s", bucketName, fileset); + } + + @Disabled( + "GCS does not support append, java.io.IOException: The append operation is not supported") + public void testAppend() throws IOException {} +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index fc06975c19d..6a50fc2b455 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -150,7 +150,8 @@ hadoop2-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", version.ref hadoop2-hdfs-client = { group = "org.apache.hadoop", name = "hadoop-hdfs-client", version.ref = "hadoop2" } hadoop2-common = { group = "org.apache.hadoop", name = "hadoop-common", version.ref = "hadoop2"} hadoop2-mapreduce-client-core = { group = "org.apache.hadoop", name = "hadoop-mapreduce-client-core", version.ref = "hadoop2"} -hadoop2-s3 = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref = "hadoop2"} +hadoop2-aws = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref = "hadoop2"} +hadoop3-aws = { group = "org.apache.hadoop", name = "hadoop-aws", version.ref = "hadoop3"} hadoop3-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", version.ref = "hadoop3" } hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common", version.ref = "hadoop3"} hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client", version.ref = "hadoop3"} diff --git a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/HttpUtils.java b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/HttpUtils.java index 6ccac7dd76e..0fe4d728c73 100644 --- a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/HttpUtils.java +++ b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/HttpUtils.java @@ -29,7 +29,7 @@ public class HttpUtils { private static final Logger LOG = LoggerFactory.getLogger(HttpUtils.class); /** - * Check if the http server is up, If http response status code is 200, then we're assuming the + * Check if the http server is up. If http response status code is 200, then we're assuming the * server is up. Or else we assume the server is not ready. * *

Note: The method will ignore the response body and only check the status code. diff --git a/settings.gradle.kts b/settings.gradle.kts index 10cf107497c..6d08431f0fb 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -70,6 +70,7 @@ project(":spark-connector:spark-runtime-3.5").projectDir = file("spark-connector include("web:web", "web:integration-test") include("docs") include("integration-test-common") +include(":bundles:aws-bundle") include(":bundles:gcp-bundle") include("bundles:aliyun-bundle") findProject(":bundles:aliyun-bundle")?.name = "aliyun-bundle"