Skip to content

Commit

Permalink
feat: Added support to write iceberg tables (#5989)
Browse files Browse the repository at this point in the history
Closes: #6125 

Also moves existing Iceberg tests from Junit4 to Junit5.
  • Loading branch information
malhotrashivam authored Nov 22, 2024
1 parent 3a547af commit ecdc8e7
Show file tree
Hide file tree
Showing 31 changed files with 3,157 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.HttpClientProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.rest.RESTCatalog;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -22,9 +24,8 @@
import java.util.Map;

/**
* Tools for accessing tables in the Iceberg table format.
* Tools for accessing tables in the Iceberg table format from S3.
*/
@SuppressWarnings("unused")
public final class IcebergToolsS3 {

/**
Expand Down Expand Up @@ -52,14 +53,6 @@ public static IcebergCatalogAdapter createS3Rest(

// Set up the properties map for the Iceberg catalog
final Map<String, String> properties = new HashMap<>();

final RESTCatalog catalog = new RESTCatalog();

properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// Configure the properties map from the Iceberg instructions.
if (!Strings.isNullOrEmpty(accessKeyId) && !Strings.isNullOrEmpty(secretAccessKey)) {
properties.put(S3FileIOProperties.ACCESS_KEY_ID, accessKeyId);
properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, secretAccessKey);
Expand All @@ -71,10 +64,9 @@ public static IcebergCatalogAdapter createS3Rest(
properties.put(S3FileIOProperties.ENDPOINT, endpointOverride);
}

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
final RESTCatalog catalog = new RESTCatalog();
catalog.setConf(new Configuration());
catalog.initialize(catalogName, properties);
return IcebergCatalogAdapter.of(catalog);
return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties);
}

/**
Expand All @@ -97,16 +89,28 @@ public static IcebergCatalogAdapter createGlue(
final Map<String, String> properties = new HashMap<>();

final GlueCatalog catalog = new GlueCatalog();
catalog.setConf(new Configuration());
return createAdapterCommon(name, catalogURI, warehouseLocation, catalog, properties);
}

private static IcebergCatalogAdapter createAdapterCommon(
@Nullable final String name,
@NotNull final String catalogURI,
@NotNull final String warehouseLocation,
@NotNull final Catalog catalog,
@NotNull final Map<String, String> properties) {
properties.put(CatalogProperties.CATALOG_IMPL, catalog.getClass().getName());
properties.put(CatalogProperties.URI, catalogURI);
properties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation);

// Following is needed to write new manifest files when writing new data.
// Not setting this will result in using ResolvingFileIO.
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());

final String catalogName = name != null ? name : "IcebergCatalog-" + catalogURI;
catalog.setConf(new Configuration());
catalog.initialize(catalogName, properties);

return new IcebergCatalogAdapter(catalog, properties);
return IcebergCatalogAdapter.of(catalog, properties);
}

/**
Expand Down
Loading

0 comments on commit ecdc8e7

Please sign in to comment.