diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java index 27e3c429e7c..8c97e67d922 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java @@ -131,8 +131,15 @@ String transformDistribution(Distribution distribution) { ArrayUtils.isNotEmpty(partitioning) || ArrayUtils.isNotEmpty(sortOrders), "Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty."); return DistributionMode.RANGE.modeName(); + // Gravitino NONE distribution means the client side doesn't specify distribution. case NONE: - return DistributionMode.NONE.modeName(); + if (ArrayUtils.isNotEmpty(sortOrders)) { + return DistributionMode.RANGE.modeName(); + } else if (ArrayUtils.isNotEmpty(partitioning)) { + return DistributionMode.HASH.modeName(); + } else { + return DistributionMode.NONE.modeName(); + } default: throw new IllegalArgumentException( "Iceberg unsupported distribution strategy: " + distribution.strategy()); diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java index 28ffc707df3..26b0e6e1af9 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/TestIcebergTable.java @@ -561,7 +561,8 @@ public void testTableDistribution() { add(col_2); } }; - IcebergTable icebergTable = + + IcebergTable tableWithoutPartitionAdnSortOrder = IcebergTable.builder() .withName("test_table") .withAuditInfo( @@ -571,13 +572,14 @@ public void testTableDistribution() { .withComment("test_table") .build(); String none = - Assertions.assertDoesNotThrow(() -> icebergTable.transformDistribution(Distributions.NONE)); + Assertions.assertDoesNotThrow( + () -> tableWithoutPartitionAdnSortOrder.transformDistribution(Distributions.NONE)); Assertions.assertEquals(none, DistributionMode.NONE.modeName()); IllegalArgumentException illegalArgumentException = Assertions.assertThrows( IllegalArgumentException.class, - () -> icebergTable.transformDistribution(Distributions.HASH)); + () -> tableWithoutPartitionAdnSortOrder.transformDistribution(Distributions.HASH)); Assertions.assertTrue( StringUtils.contains( illegalArgumentException.getMessage(), @@ -586,13 +588,13 @@ public void testTableDistribution() { illegalArgumentException = Assertions.assertThrows( IllegalArgumentException.class, - () -> icebergTable.transformDistribution(Distributions.RANGE)); + () -> tableWithoutPartitionAdnSortOrder.transformDistribution(Distributions.RANGE)); Assertions.assertTrue( StringUtils.contains( illegalArgumentException.getMessage(), "Iceberg's Distribution Mode.RANGE is distributed based on sortOrder or partition, but both are empty")); - IcebergTable newTable = + IcebergTable tableWithPartitionAndSortorder = IcebergTable.builder() .withName("test_table2") .withAuditInfo( @@ -610,14 +612,60 @@ public void testTableDistribution() { .withComment("test_table2") .build(); String distributionName = - Assertions.assertDoesNotThrow(() -> newTable.transformDistribution(Distributions.NONE)); - Assertions.assertEquals(distributionName, DistributionMode.NONE.modeName()); + Assertions.assertDoesNotThrow( + () -> tableWithPartitionAndSortorder.transformDistribution(Distributions.NONE)); + Assertions.assertEquals(distributionName, DistributionMode.RANGE.modeName()); distributionName = - Assertions.assertDoesNotThrow(() -> newTable.transformDistribution(Distributions.HASH)); + Assertions.assertDoesNotThrow( + () -> tableWithPartitionAndSortorder.transformDistribution(Distributions.HASH)); Assertions.assertEquals(distributionName, DistributionMode.HASH.modeName()); distributionName = - Assertions.assertDoesNotThrow(() -> newTable.transformDistribution(Distributions.RANGE)); + Assertions.assertDoesNotThrow( + () -> tableWithPartitionAndSortorder.transformDistribution(Distributions.RANGE)); + Assertions.assertEquals(distributionName, DistributionMode.RANGE.modeName()); + + IcebergTable tableWithPartition = + IcebergTable.builder() + .withName("test_table2") + .withAuditInfo( + AuditInfo.builder().withCreator("test2").withCreateTime(Instant.now()).build()) + .withProperties(Maps.newHashMap()) + .withPartitioning(new Transform[] {day("col_1")}) + .withColumns(icebergColumns.toArray(new IcebergColumn[0])) + .withComment("test_table2") + .build(); + distributionName = tableWithPartition.transformDistribution(Distributions.NONE); + Assertions.assertEquals(distributionName, DistributionMode.HASH.modeName()); + distributionName = tableWithPartition.transformDistribution(Distributions.HASH); + Assertions.assertEquals(distributionName, DistributionMode.HASH.modeName()); + distributionName = tableWithPartition.transformDistribution(Distributions.RANGE); + Assertions.assertEquals(distributionName, DistributionMode.RANGE.modeName()); + + IcebergTable tableWithSortOrder = + IcebergTable.builder() + .withName("test_table2") + .withAuditInfo( + AuditInfo.builder().withCreator("test2").withCreateTime(Instant.now()).build()) + .withProperties(Maps.newHashMap()) + .withSortOrders( + new SortOrder[] { + SortOrders.of( + NamedReference.field("col_1"), + SortDirection.DESCENDING, + NullOrdering.NULLS_FIRST) + }) + .withColumns(icebergColumns.toArray(new IcebergColumn[0])) + .withComment("test_table2") + .build(); + distributionName = tableWithSortOrder.transformDistribution(Distributions.NONE); + Assertions.assertEquals(distributionName, DistributionMode.RANGE.modeName()); + distributionName = tableWithSortOrder.transformDistribution(Distributions.RANGE); Assertions.assertEquals(distributionName, DistributionMode.RANGE.modeName()); + Assertions.assertThrowsExactly( + IllegalArgumentException.class, + () -> { + tableWithSortOrder.transformDistribution(Distributions.HASH); + }); } protected static String genRandomName() { diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java index fd37441b459..67f030a4a7f 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java @@ -379,6 +379,76 @@ void testCreateTableWithNullComment() { Assertions.assertNull(loadTable.comment()); } + @Test + void testCreateTableWithoutDistribution() { + // Create table from Gravitino API + Column[] columns = createColumns(); + + NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName); + Distribution distribution = Distributions.NONE; + + final SortOrder[] sortOrders = + new SortOrder[] { + SortOrders.of( + NamedReference.field(ICEBERG_COL_NAME2), + SortDirection.DESCENDING, + NullOrdering.NULLS_FIRST) + }; + + Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())}; + Map properties = createProperties(); + TableCatalog tableCatalog = catalog.asTableCatalog(); + Table tableWithPartitionAndSortorder = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + sortOrders); + Assertions.assertEquals(tableName, tableWithPartitionAndSortorder.name()); + Assertions.assertEquals(Distributions.RANGE, tableWithPartitionAndSortorder.distribution()); + + Table loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.RANGE, loadTable.distribution()); + tableCatalog.dropTable(tableIdentifier); + + Table tableWithPartition = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + partitioning, + distribution, + new SortOrder[0]); + Assertions.assertEquals(tableName, tableWithPartition.name()); + Assertions.assertEquals(Distributions.HASH, tableWithPartition.distribution()); + + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.HASH, loadTable.distribution()); + tableCatalog.dropTable(tableIdentifier); + + Table tableWithoutPartitionAndSortOrder = + tableCatalog.createTable( + tableIdentifier, + columns, + table_comment, + properties, + new Transform[0], + distribution, + new SortOrder[0]); + Assertions.assertEquals(tableName, tableWithoutPartitionAndSortOrder.name()); + Assertions.assertEquals(Distributions.NONE, tableWithoutPartitionAndSortOrder.distribution()); + + loadTable = tableCatalog.loadTable(tableIdentifier); + Assertions.assertEquals(tableName, loadTable.name()); + Assertions.assertEquals(Distributions.NONE, loadTable.distribution()); + } + @Test void testCreateAndLoadIcebergTable() { // Create table from Gravitino API diff --git a/docs/lakehouse-iceberg-catalog.md b/docs/lakehouse-iceberg-catalog.md index 6ad011d7160..f8b462b24e5 100644 --- a/docs/lakehouse-iceberg-catalog.md +++ b/docs/lakehouse-iceberg-catalog.md @@ -220,79 +220,12 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the ### Table distributions -- Gravitino used by default `NoneDistribution`. +- Support `HashDistribution`, which distribute data by partition key. +- Support `RangeDistribution`, which distribute data by partition key or sort key for a SortOrder table. +- Doesn't support `EvenDistribution`. - - - -```json -{ - "strategy": "none", - "number": 0, - "expressions": [] -} -``` - - - - -```java -Distributions.NONE; -``` - - - - -- Support `HashDistribution`, Hash distribute by partition key. - - - - -```json -{ - "strategy": "hash", - "number": 0, - "expressions": [] -} -``` - - - -```java -Distributions.HASH; -``` - - - - -- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder. - - - - -```json -{ - "strategy": "range", - "number": 0, - "expressions": [] -} -``` - - - - -```java -Distributions.RANGE; -``` - - - - -:::info -Iceberg automatically distributes the data according to the partition or table sort order. It is forbidden to specify distribution expressions. -::: :::info -Apache Iceberg doesn't support Gravitino `EvenDistribution` type. +If you doesn't specify distribution expressions, the table distribution will be adjusted to `RangeDistribution` for a sort order table, to `HashDistribution` for a partition table. ::: ### Table column types