From 6c701cdeca114444c46ef2edc7b9bc8ad85fbcdf Mon Sep 17 00:00:00 2001 From: Clearvive Date: Fri, 12 Jan 2024 15:54:17 +0800 Subject: [PATCH] [#1460] feat(iceberg): Iceberg supports distributions. --- .../distributions/Distributions.java | 30 ++++ .../iceberg/IcebergCatalogOperations.java | 7 +- .../lakehouse/iceberg/IcebergTable.java | 68 +++++++- .../IcebergTablePropertiesMetadata.java | 6 +- .../gravitino/dto/rel/DistributionDTO.java | 3 + docs/lakehouse-iceberg-catalog.md | 22 ++- .../lakehouse/iceberg/CatalogIcebergIT.java | 163 ++++++++++++++++++ 7 files changed, 287 insertions(+), 12 deletions(-) diff --git a/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Distributions.java b/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Distributions.java index 091c75ee580..f824690b7eb 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Distributions.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/expressions/distributions/Distributions.java @@ -26,6 +26,15 @@ public static Distribution even(int number, Expression... expressions) { return new DistributionImpl(Strategy.EVEN, number, expressions); } + /** + * Create a distribution by hashing the data. + * + * @return The created hash distribution + */ + public static Distribution hash(Expression... expressions) { + return hash(0, expressions); + } + /** * Create a distribution by hashing the data across the number of buckets. * @@ -37,6 +46,27 @@ public static Distribution hash(int number, Expression... expressions) { return new DistributionImpl(Strategy.HASH, number, expressions); } + /** + * Create a distribution by range the data. + * + * @param expressions The expressions to distribute by + * @return The created range distribution + */ + public static Distribution range(Expression... expressions) { + return range(0, expressions); + } + + /** + * Create a distribution by range the data across the number of buckets. + * + * @param number The number of buckets + * @param expressions The expressions to distribute by + * @return The created range distribution + */ + public static Distribution range(int number, Expression... expressions) { + return new DistributionImpl(Strategy.RANGE, number, expressions); + } + /** * Create a distribution by the given strategy. * diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 4b6cdaf9074..566c4b04355 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -27,7 +27,6 @@ import com.datastrato.gravitino.rel.TableCatalog; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; -import com.datastrato.gravitino.rel.expressions.distributions.Distributions; import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.utils.MapUtils; @@ -483,10 +482,6 @@ public Table createTable( SortOrder[] sortOrders) throws NoSuchSchemaException, TableAlreadyExistsException { try { - if (!Distributions.NONE.equals(distribution)) { - throw new UnsupportedOperationException("Iceberg does not support distribution"); - } - NameIdentifier schemaIdent = NameIdentifier.of(tableIdent.namespace().levels()); if (!schemaExists(schemaIdent)) { LOG.warn("Iceberg schema (database) does not exist: {}", schemaIdent); @@ -512,7 +507,7 @@ public Table createTable( .withPartitioning(partitioning) .withSortOrders(sortOrders) .withProperties(properties) - .withDistribution(Distributions.NONE) + .withDistribution(distribution) .withAuditInfo( new AuditInfo.Builder() .withCreator(currentUser()) diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 6ed6de81731..5638fe1248f 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.catalog.lakehouse.iceberg; +import static com.datastrato.gravitino.catalog.lakehouse.iceberg.IcebergTablePropertiesMetadata.DISTRIBUTION_MODE; + import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.ConvertUtil; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergPartitionSpec; import com.datastrato.gravitino.catalog.lakehouse.iceberg.converter.FromIcebergSortOrder; @@ -12,10 +14,17 @@ import com.datastrato.gravitino.catalog.lakehouse.iceberg.ops.IcebergTableOpsHelper; import com.datastrato.gravitino.catalog.rel.BaseTable; import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.rel.expressions.distributions.Distribution; +import com.datastrato.gravitino.rel.expressions.distributions.Distributions; +import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; +import com.datastrato.gravitino.rel.expressions.transforms.Transform; +import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import java.util.Map; import lombok.Getter; import lombok.ToString; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.Schema; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.rest.requests.CreateTableRequest; @@ -46,6 +55,35 @@ private IcebergTable() {} public CreateTableRequest toCreateTableRequest() { Schema schema = ConvertUtil.toIcebergSchema(this); + properties = properties == null ? Maps.newHashMap() : Maps.newHashMap(properties); + if (null == distribution) { + properties.put(DISTRIBUTION_MODE, DistributionMode.NONE.modeName()); + } else { + switch (distribution.strategy()) { + case HASH: + Preconditions.checkArgument( + ArrayUtils.isEmpty(distribution.expressions()), + "Iceberg's Distribution Mode.HASH not support set expressions."); + Preconditions.checkArgument( + ArrayUtils.isNotEmpty(partitioning), + "Iceberg's Distribution Mode.HASH is distributed based on partition, but the partition is empty."); + properties.put(DISTRIBUTION_MODE, DistributionMode.HASH.modeName()); + break; + case RANGE: + Preconditions.checkArgument( + ArrayUtils.isEmpty(distribution.expressions()), + "Iceberg's Distribution Mode.RANGE not support set expressions."); + Preconditions.checkArgument( + ArrayUtils.isNotEmpty(partitioning) && ArrayUtils.isNotEmpty(sortOrders), + "Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty."); + properties.put(DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()); + break; + case EVEN: + default: + throw new IllegalArgumentException( + "Iceberg unsupported distribution strategy: " + distribution.strategy()); + } + } Map resultProperties = Maps.newHashMap(IcebergTableOpsHelper.removeReservedProperties(properties)); resultProperties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); @@ -69,8 +107,30 @@ public CreateTableRequest toCreateTableRequest() { */ public static IcebergTable fromIcebergTable(TableMetadata table, String tableName) { Map properties = table.properties(); - Schema schema = table.schema(); + Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema); + SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder()); + Distribution distribution = Distributions.NONE; + String distributionName = properties.get(DISTRIBUTION_MODE); + if (null != distributionName) { + switch (DistributionMode.fromName(distributionName)) { + case HASH: + Preconditions.checkArgument( + ArrayUtils.isNotEmpty(partitionSpec), + "Iceberg's Distribution Mode.HASH is distributed based on partition, but the partition is empty."); + distribution = Distributions.hash(); + break; + case RANGE: + Preconditions.checkArgument( + ArrayUtils.isNotEmpty(partitionSpec) && ArrayUtils.isNotEmpty(sortOrder), + "Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty."); + distribution = Distributions.range(); + break; + case NONE: + // do nothing + break; + } + } IcebergColumn[] icebergColumns = schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new); return new IcebergTable.Builder() @@ -80,8 +140,9 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam .withColumns(icebergColumns) .withName(tableName) .withAuditInfo(AuditInfo.EMPTY) - .withPartitioning(FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema)) - .withSortOrders(FromIcebergSortOrder.fromSortOrder(table.sortOrder())) + .withPartitioning(partitionSpec) + .withSortOrders(sortOrder) + .withDistribution(distribution) .build(); } @@ -115,6 +176,7 @@ protected IcebergTable internalBuild() { icebergTable.location = icebergTable.properties.get(PROP_LOCATION); } icebergTable.partitioning = partitioning; + icebergTable.distribution = distribution; icebergTable.sortOrders = sortOrders; if (null != comment) { icebergTable.properties.putIfAbsent(ICEBERG_COMMENT_FIELD_NAME, comment); diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java index c227f80d64f..9b3e5314d45 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergTablePropertiesMetadata.java @@ -12,6 +12,7 @@ import com.google.common.collect.Maps; import java.util.List; import java.util.Map; +import org.apache.iceberg.TableProperties; public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { public static final String COMMENT = "comment"; @@ -22,6 +23,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { public static final String SORT_ORDER = "sort-order"; public static final String IDENTIFIER_FIELDS = "identifier-fields"; + public static final String DISTRIBUTION_MODE = TableProperties.WRITE_DISTRIBUTION_MODE; + private static final Map> PROPERTIES_METADATA; static { @@ -41,7 +44,8 @@ public class IcebergTablePropertiesMetadata extends BasePropertiesMetadata { stringReservedPropertyEntry( SORT_ORDER, "Selecting a specific snapshot in a merge operation", false), stringReservedPropertyEntry( - IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false)); + IDENTIFIER_FIELDS, "The identifier field(s) for defining the table", false), + stringReservedPropertyEntry(DISTRIBUTION_MODE, "Write distribution mode", false)); PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); } diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/DistributionDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/DistributionDTO.java index 58d550e1ddf..4a3b30d3eb4 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/rel/DistributionDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/DistributionDTO.java @@ -12,6 +12,7 @@ import com.datastrato.gravitino.rel.expressions.Expression; import com.datastrato.gravitino.rel.expressions.distributions.Distribution; import com.datastrato.gravitino.rel.expressions.distributions.Strategy; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.google.common.base.Preconditions; @@ -27,9 +28,11 @@ public class DistributionDTO implements Distribution { // Distribution strategy/method private final Strategy strategy; + @JsonIgnoreProperties(ignoreUnknown = true) // Number of buckets/distribution private final int number; + @JsonIgnoreProperties(ignoreUnknown = true) private final FunctionArg[] args; private DistributionDTO(Strategy strategy, int number, FunctionArg[] args) { diff --git a/docs/lakehouse-iceberg-catalog.md b/docs/lakehouse-iceberg-catalog.md index a69e7abc623..4c279af4d00 100644 --- a/docs/lakehouse-iceberg-catalog.md +++ b/docs/lakehouse-iceberg-catalog.md @@ -111,10 +111,28 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the ### Table distributions -- Doesn't support `Distribution`, you should use `BucketPartition` instead. +- Gravitino used by default `HashDistribution`. Hash distribute by partition key. +```json +{ + "strategy": "hash" +} +``` +```java +Distributions.hash(); +``` + +- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder. +```json +{ + "strategy": "range" +} +``` +```java +Distributions.range(); +``` :::info -If you load Iceberg tables, the table distribution strategy is `hash` with num 0, which means no distribution. +Iceberg automatically calculates the bucket value and performs hashing based on the partition field, or automatically calculates ranges according to SortOrder. ::: ### Table column types diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java index 789a81ab56f..d134ee7a6b0 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/catalog/lakehouse/iceberg/CatalogIcebergIT.java @@ -20,6 +20,7 @@ import com.datastrato.gravitino.dto.rel.partitions.DayPartitioningDTO; import com.datastrato.gravitino.dto.rel.partitions.IdentityPartitioningDTO; import com.datastrato.gravitino.dto.rel.partitions.Partitioning; +import com.datastrato.gravitino.dto.util.DTOConverters; import com.datastrato.gravitino.exceptions.NoSuchSchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; @@ -55,6 +56,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.iceberg.NullOrder; import org.apache.iceberg.PartitionField; @@ -811,4 +813,165 @@ public void testOperatorSchemeProperties() { Assertions.assertThrows( NoSuchSchemaException.class, () -> catalog.asSchemas().loadSchema(ident)); } + + @Test + public void testTableDistribution() { + ColumnDTO[] columns = createColumns(); + + NameIdentifier tableIdentifier = + NameIdentifier.of(metalakeName, catalogName, schemaName, tableName); + Distribution distribution = Distributions.NONE; + + final SortOrder[] sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field(ICEBERG_COL_NAME2), + SortDirection.DESCENDING, + NullOrdering.NULLS_FIRST) + }; + + Partitioning[] partitioning = new Partitioning[] {DayPartitioningDTO.of(columns[1].name())}; + + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + // Create a data table for Distributions.NONE + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + + // check table + assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + distribution, + sortOrders, + partitioning, + loadTable); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); + + distribution = Distributions.hash(); + // Create a data table for Distributions.hash + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + + loadTable = tableCatalog.loadTable(tableIdentifier); + // check table + assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + distribution, + sortOrders, + partitioning, + loadTable); + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); + + // Create a data table for Distributions.NONE and set field name + IllegalArgumentException illegalArgumentException = + assertThrows( + IllegalArgumentException.class, + () -> { + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + Distributions.hash(NamedReference.field(columns[1].name())), + sortOrders); + }); + Assertions.assertTrue( + StringUtils.contains( + illegalArgumentException.getMessage(), + "Iceberg's Distribution Mode.HASH not support set expressions.")); + + distribution = Distributions.range(); + // Create a data table for Distributions.hash + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + + loadTable = tableCatalog.loadTable(tableIdentifier); + // check table + assertionsTableInfo( + tableName, + table_comment, + Arrays.asList(columns), + properties, + distribution, + sortOrders, + partitioning, + loadTable); + + Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier)); + + // Create a data table for Distributions.range and set field name + illegalArgumentException = + assertThrows( + IllegalArgumentException.class, + () -> { + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + Distributions.range(NamedReference.field(columns[1].name())), + sortOrders); + }); + Assertions.assertTrue( + StringUtils.contains( + illegalArgumentException.getMessage(), + "Iceberg's Distribution Mode.RANGE not support set expressions.")); + } + + protected static void assertionsTableInfo( + String tableName, + String tableComment, + List columns, + Map properties, + Distribution distribution, + SortOrder[] sortOrder, + Partitioning[] partitioning, + Table table) { + Assertions.assertEquals(tableName, table.name()); + Assertions.assertEquals(tableComment, table.comment()); + Assertions.assertEquals(columns.size(), table.columns().length); + Assertions.assertEquals(DTOConverters.toDTO(distribution), table.distribution()); + Assertions.assertArrayEquals(DTOConverters.toDTOs(sortOrder), table.sortOrder()); + Assertions.assertArrayEquals(DTOConverters.toDTOs(partitioning), table.partitioning()); + for (int i = 0; i < columns.size(); i++) { + Assertions.assertEquals(columns.get(i).name(), table.columns()[i].name()); + Assertions.assertEquals(columns.get(i).dataType(), table.columns()[i].dataType()); + Assertions.assertEquals(columns.get(i).nullable(), table.columns()[i].nullable()); + Assertions.assertEquals(columns.get(i).comment(), table.columns()[i].comment()); + Assertions.assertEquals(columns.get(i).autoIncrement(), table.columns()[i].autoIncrement()); + } + + for (Map.Entry entry : properties.entrySet()) { + Assertions.assertEquals(entry.getValue(), table.properties().get(entry.getKey())); + } + } }