From bb0a3187f6ea06cb7fbaa975f369bb44d6a2b8fe Mon Sep 17 00:00:00 2001 From: Jerry Shao Date: Mon, 13 May 2024 16:54:45 +0800 Subject: [PATCH] [#2266] fix(partition): enable preassign partition when creating range and list partitioning table (#3363) ### What changes were proposed in this pull request? enable pre-assign partition when creating range and list partitioning table ### Why are the changes needed? Fix: #2266 ### Does this PR introduce _any_ user-facing change? yes, enable pre-assign partition when creating range and list partitioning table ### How was this patch tested? tests modified Co-authored-by: mchades --- .../rel/expressions/transforms/Transform.java | 12 +- .../expressions/transforms/Transforms.java | 53 ++- .../gravitino/rel/partitions/Partitions.java | 3 + .../client/TestRelationalCatalog.java | 56 ++++ .../rel/partitioning/ListPartitioningDTO.java | 55 ++-- .../partitioning/RangePartitioningDTO.java | 55 ++-- .../gravitino/dto/util/DTOConverters.java | 28 +- .../datastrato/gravitino/json/JsonUtils.java | 305 +++++++++++------- .../gravitino/json/TestDTOJsonSerDe.java | 133 +++++++- docs/open-api/partitioning.yaml | 10 + .../server/web/rest/TestTableOperations.java | 57 ++++ 11 files changed, 553 insertions(+), 214 deletions(-) diff --git a/api/src/main/java/com/datastrato/gravitino/rel/expressions/transforms/Transform.java b/api/src/main/java/com/datastrato/gravitino/rel/expressions/transforms/Transform.java index 8bbe9596ecf..46478a79d80 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/expressions/transforms/Transform.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/expressions/transforms/Transform.java @@ -20,9 +20,12 @@ package com.datastrato.gravitino.rel.expressions.transforms; +import static com.datastrato.gravitino.rel.partitions.Partitions.EMPTY_PARTITIONS; + import com.datastrato.gravitino.annotation.Evolving; import com.datastrato.gravitino.rel.expressions.Expression; import com.datastrato.gravitino.rel.expressions.NamedReference; +import com.datastrato.gravitino.rel.partitions.Partition; import java.util.Objects; /** @@ -40,11 +43,12 @@ public interface Transform extends Expression { Expression[] arguments(); /** - * @return The preassigned partitions in the partitioning. Currently, only ListTransform and - * RangeTransform need to deal with assignments + * @return The preassigned partitions in the partitioning. Currently, only {@link + * Transforms.ListTransform} and {@link Transforms.RangeTransform} need to deal with + * assignments */ - default Expression[] assignments() { - return Expression.EMPTY_EXPRESSION; + default Partition[] assignments() { + return EMPTY_PARTITIONS; } @Override diff --git a/api/src/main/java/com/datastrato/gravitino/rel/expressions/transforms/Transforms.java b/api/src/main/java/com/datastrato/gravitino/rel/expressions/transforms/Transforms.java index 6499f3cad67..ecc98a76ad0 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/expressions/transforms/Transforms.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/expressions/transforms/Transforms.java @@ -8,6 +8,8 @@ import com.datastrato.gravitino.rel.expressions.NamedReference; import com.datastrato.gravitino.rel.expressions.literals.Literal; import com.datastrato.gravitino.rel.expressions.literals.Literals; +import com.datastrato.gravitino.rel.partitions.ListPartition; +import com.datastrato.gravitino.rel.partitions.RangePartition; import com.google.common.collect.ObjectArrays; import java.util.Arrays; import java.util.Objects; @@ -166,8 +168,20 @@ public static BucketTransform bucket(int numBuckets, String[]... fieldNames) { * @return The created transform */ public static ListTransform list(String[]... fieldNames) { + return list(fieldNames, new ListPartition[0]); + } + + /** + * Create a transform that includes multiple fields in a list with preassigned list partitions. + * + * @param fieldNames The field names to include in the list + * @param assignments The preassigned list partitions + * @return The created transform + */ + public static ListTransform list(String[][] fieldNames, ListPartition[] assignments) { return new ListTransform( - Arrays.stream(fieldNames).map(NamedReference::field).toArray(NamedReference[]::new)); + Arrays.stream(fieldNames).map(NamedReference::field).toArray(NamedReference[]::new), + assignments); } /** @@ -177,7 +191,18 @@ public static ListTransform list(String[]... fieldNames) { * @return The created transform */ public static RangeTransform range(String[] fieldName) { - return new RangeTransform(NamedReference.field(fieldName)); + return range(fieldName, new RangePartition[0]); + } + + /** + * Create a transform that returns the range of the input value with preassigned range partitions. + * + * @param fieldName The field name to transform + * @param assignments The preassigned range partitions + * @return The created transform + */ + public static RangeTransform range(String[] fieldName, RangePartition[] assignments) { + return new RangeTransform(NamedReference.field(fieldName), assignments); } /** @@ -486,9 +511,16 @@ public int hashCode() { public static final class ListTransform implements Transform { private final NamedReference[] fields; + private final ListPartition[] assignments; private ListTransform(NamedReference[] fields) { this.fields = fields; + this.assignments = new ListPartition[0]; + } + + private ListTransform(NamedReference[] fields, ListPartition[] assignments) { + this.fields = fields; + this.assignments = assignments; } /** @return The field names to include in the list. */ @@ -510,9 +542,8 @@ public Expression[] arguments() { /** @return The assignments to the transform. */ @Override - public Expression[] assignments() { - // todo: resolve this - return Transform.super.assignments(); + public ListPartition[] assignments() { + return assignments; } @Override @@ -537,9 +568,16 @@ public int hashCode() { public static final class RangeTransform implements Transform { private final NamedReference field; + private final RangePartition[] assignments; private RangeTransform(NamedReference field) { this.field = field; + this.assignments = new RangePartition[0]; + } + + private RangeTransform(NamedReference field, RangePartition[] assignments) { + this.field = field; + this.assignments = assignments; } /** @return The field name to transform. */ @@ -561,9 +599,8 @@ public Expression[] arguments() { /** @return The assignments to the transform. */ @Override - public Expression[] assignments() { - // todo: resolve this - return Transform.super.assignments(); + public RangePartition[] assignments() { + return assignments; } @Override diff --git a/api/src/main/java/com/datastrato/gravitino/rel/partitions/Partitions.java b/api/src/main/java/com/datastrato/gravitino/rel/partitions/Partitions.java index 19e7f60414d..bbd49d10291 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/partitions/Partitions.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/partitions/Partitions.java @@ -12,6 +12,9 @@ /** The helper class for partition expressions. */ public class Partitions { + /** An empty array of partitions. */ + public static Partition[] EMPTY_PARTITIONS = new Partition[0]; + /** * Creates a range partition. * diff --git a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestRelationalCatalog.java b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestRelationalCatalog.java index a409feec317..4e95b02feee 100644 --- a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestRelationalCatalog.java +++ b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestRelationalCatalog.java @@ -32,6 +32,8 @@ import com.datastrato.gravitino.dto.rel.partitioning.DayPartitioningDTO; import com.datastrato.gravitino.dto.rel.partitioning.IdentityPartitioningDTO; import com.datastrato.gravitino.dto.rel.partitioning.Partitioning; +import com.datastrato.gravitino.dto.rel.partitioning.RangePartitioningDTO; +import com.datastrato.gravitino.dto.rel.partitions.RangePartitionDTO; import com.datastrato.gravitino.dto.requests.CatalogCreateRequest; import com.datastrato.gravitino.dto.requests.SchemaCreateRequest; import com.datastrato.gravitino.dto.requests.SchemaUpdateRequest; @@ -556,6 +558,60 @@ public void testCreatePartitionedTable() throws JsonProcessingException { tableId, new Column[0], "comment", emptyMap, errorPartitioning)); Assertions.assertTrue( ex3.getMessage().contains("\"columns\" field is required and cannot be empty")); + + // Test partitioning with assignments + Partitioning[] partitioningWithAssignments = { + RangePartitioningDTO.of( + new String[] {columns[0].name()}, + new RangePartitionDTO[] { + RangePartitionDTO.builder() + .withName("p1") + .withLower( + LiteralDTO.builder() + .withDataType(Types.IntegerType.get()) + .withValue("1") + .build()) + .withUpper( + LiteralDTO.builder() + .withDataType(Types.IntegerType.get()) + .withValue("10") + .build()) + .build() + }) + }; + expectedTable = + createMockTable( + "table1", + columns, + "comment", + Collections.emptyMap(), + partitioningWithAssignments, + DistributionDTO.NONE, + SortOrderDTO.EMPTY_SORT); + + req = + new TableCreateRequest( + tableId.name(), + "comment", + columns, + Collections.emptyMap(), + SortOrderDTO.EMPTY_SORT, + DistributionDTO.NONE, + partitioningWithAssignments, + IndexDTO.EMPTY_INDEXES); + resp = new TableResponse(expectedTable); + buildMockResource(Method.POST, tablePath, req, resp, SC_OK); + + table = + catalog + .asTableCatalog() + .createTable( + tableId, + fromDTOs(columns), + "comment", + Collections.emptyMap(), + partitioningWithAssignments); + assertTableEquals(fromDTO(expectedTable), table); } @Test diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/partitioning/ListPartitioningDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/partitioning/ListPartitioningDTO.java index c107313c4b5..77f1f789c2d 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/rel/partitioning/ListPartitioningDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/partitioning/ListPartitioningDTO.java @@ -7,6 +7,7 @@ import static com.datastrato.gravitino.dto.rel.PartitionUtils.validateFieldExistence; import com.datastrato.gravitino.dto.rel.ColumnDTO; +import com.datastrato.gravitino.dto.rel.partitions.ListPartitionDTO; import com.datastrato.gravitino.rel.expressions.Expression; import com.datastrato.gravitino.rel.expressions.NamedReference; import java.util.Arrays; @@ -17,19 +18,32 @@ public final class ListPartitioningDTO implements Partitioning { /** - * Creates a new ListPartitioningDTO. + * Creates a new ListPartitioningDTO with no pre-assigned partitions. * * @param fieldNames The names of the fields to partition. * @return The new ListPartitioningDTO. */ public static ListPartitioningDTO of(String[][] fieldNames) { - return new ListPartitioningDTO(fieldNames); + return of(fieldNames, new ListPartitionDTO[0]); + } + + /** + * Creates a new ListPartitioningDTO. + * + * @param fieldNames The names of the fields to partition. + * @param assignments The pre-assigned list partitions. + * @return The new ListPartitioningDTO. + */ + public static ListPartitioningDTO of(String[][] fieldNames, ListPartitionDTO[] assignments) { + return new ListPartitioningDTO(fieldNames, assignments); } private final String[][] fieldNames; + private final ListPartitionDTO[] assignments; - private ListPartitioningDTO(String[][] fieldNames) { + private ListPartitioningDTO(String[][] fieldNames, ListPartitionDTO[] assignments) { this.fieldNames = fieldNames; + this.assignments = assignments; } /** @return The names of the fields to partition. */ @@ -37,6 +51,11 @@ public String[][] fieldNames() { return fieldNames; } + @Override + public ListPartitionDTO[] assignments() { + return assignments; + } + /** @return The strategy of the partitioning. */ @Override public Strategy strategy() { @@ -65,34 +84,4 @@ public String name() { public Expression[] arguments() { return Arrays.stream(fieldNames).map(NamedReference::field).toArray(Expression[]::new); } - - /** The builder for ListPartitioningDTO. */ - public static class Builder { - private String[][] fieldNames; - - /** - * Set the field names for the builder. - * - * @param fieldNames The names of the fields to partition. - * @return The builder. - */ - public Builder withFieldNames(String[][] fieldNames) { - this.fieldNames = fieldNames; - return this; - } - - /** - * Builds the ListPartitioningDTO. - * - * @return The ListPartitioningDTO. - */ - public ListPartitioningDTO build() { - return new ListPartitioningDTO(fieldNames); - } - } - - /** @return the builder for creating a new instance of ListPartitioningDTO. */ - public static Builder builder() { - return new Builder(); - } } diff --git a/common/src/main/java/com/datastrato/gravitino/dto/rel/partitioning/RangePartitioningDTO.java b/common/src/main/java/com/datastrato/gravitino/dto/rel/partitioning/RangePartitioningDTO.java index be06e86a886..22ede95f044 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/rel/partitioning/RangePartitioningDTO.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/rel/partitioning/RangePartitioningDTO.java @@ -8,6 +8,7 @@ import static com.datastrato.gravitino.rel.expressions.NamedReference.field; import com.datastrato.gravitino.dto.rel.ColumnDTO; +import com.datastrato.gravitino.dto.rel.partitions.RangePartitionDTO; import com.datastrato.gravitino.rel.expressions.Expression; import lombok.EqualsAndHashCode; @@ -16,19 +17,32 @@ public final class RangePartitioningDTO implements Partitioning { /** - * Creates a new RangePartitioningDTO. + * Creates a new RangePartitioningDTO with no pre-assigned partitions. * * @param fieldName The name of the field to partition. * @return The new RangePartitioningDTO. */ public static RangePartitioningDTO of(String[] fieldName) { - return new RangePartitioningDTO(fieldName); + return of(fieldName, new RangePartitionDTO[0]); + } + + /** + * Creates a new RangePartitioningDTO. + * + * @param fieldName The name of the field to partition. + * @param assignments The pre-assigned range partitions. + * @return The new RangePartitioningDTO. + */ + public static RangePartitioningDTO of(String[] fieldName, RangePartitionDTO[] assignments) { + return new RangePartitioningDTO(fieldName, assignments); } private final String[] fieldName; + private final RangePartitionDTO[] assignments; - private RangePartitioningDTO(String[] fieldName) { + private RangePartitioningDTO(String[] fieldName, RangePartitionDTO[] assignments) { this.fieldName = fieldName; + this.assignments = assignments; } /** @return The name of the field to partition. */ @@ -42,6 +56,11 @@ public String name() { return strategy().name().toLowerCase(); } + @Override + public RangePartitionDTO[] assignments() { + return assignments; + } + /** @return The arguments of the partitioning. */ @Override public Expression[] arguments() { @@ -64,34 +83,4 @@ public Strategy strategy() { public void validate(ColumnDTO[] columns) throws IllegalArgumentException { validateFieldExistence(columns, fieldName); } - - /** The builder for the RangePartitioningDTO. */ - public static class Builder { - private String[] fieldName; - - /** - * Set the field name for the builder. - * - * @param fieldName The name of the field to partition. - * @return The builder. - */ - public Builder withFieldName(String[] fieldName) { - this.fieldName = fieldName; - return this; - } - - /** - * Builds the RangePartitioningDTO. - * - * @return The new RangePartitioningDTO. - */ - public RangePartitioningDTO build() { - return new RangePartitioningDTO(fieldName); - } - } - - /** @return the builder for creating a new instance of RangePartitioningDTO. */ - public static Builder builder() { - return new Builder(); - } } diff --git a/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java b/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java index be7a6691cc6..507d1f63cc0 100644 --- a/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java +++ b/common/src/main/java/com/datastrato/gravitino/dto/util/DTOConverters.java @@ -301,20 +301,30 @@ public static Partitioning toDTO(Transform transform) { default: throw new IllegalArgumentException("Unsupported transform: " + transform.name()); } + } else if (transform instanceof Transforms.BucketTransform) { return BucketPartitioningDTO.of( ((Transforms.BucketTransform) transform).numBuckets(), ((Transforms.BucketTransform) transform).fieldNames()); + } else if (transform instanceof Transforms.TruncateTransform) { return TruncatePartitioningDTO.of( ((Transforms.TruncateTransform) transform).width(), ((Transforms.TruncateTransform) transform).fieldName()); + } else if (transform instanceof Transforms.ListTransform) { - return ListPartitioningDTO.of(((Transforms.ListTransform) transform).fieldNames()); + Transforms.ListTransform listTransform = (Transforms.ListTransform) transform; + return ListPartitioningDTO.of( + listTransform.fieldNames(), (ListPartitionDTO[]) toDTOs(listTransform.assignments())); + } else if (transform instanceof Transforms.RangeTransform) { - return RangePartitioningDTO.of(((Transforms.RangeTransform) transform).fieldName()); + Transforms.RangeTransform rangeTransform = (Transforms.RangeTransform) transform; + return RangePartitioningDTO.of( + rangeTransform.fieldName(), (RangePartitionDTO[]) toDTOs(rangeTransform.assignments())); + } else if (transform instanceof Transforms.ApplyTransform) { return FunctionPartitioningDTO.of(transform.name(), toFunctionArg(transform.arguments())); + } else { throw new IllegalArgumentException("Unsupported transform: " + transform.name()); } @@ -840,9 +850,19 @@ public static Transform fromDTO(Partitioning partitioning) { ((TruncatePartitioningDTO) partitioning).width(), ((TruncatePartitioningDTO) partitioning).fieldName()); case LIST: - return Transforms.list(((ListPartitioningDTO) partitioning).fieldNames()); + ListPartitioningDTO listPartitioningDTO = (ListPartitioningDTO) partitioning; + ListPartition[] listPartitions = + Arrays.stream(listPartitioningDTO.assignments()) + .map(p -> (ListPartition) fromDTO(p)) + .toArray(ListPartition[]::new); + return Transforms.list(listPartitioningDTO.fieldNames(), listPartitions); case RANGE: - return Transforms.range(((RangePartitioningDTO) partitioning).fieldName()); + RangePartitioningDTO rangePartitioningDTO = (RangePartitioningDTO) partitioning; + RangePartition[] rangePartitions = + Arrays.stream(rangePartitioningDTO.assignments()) + .map(p -> (RangePartition) fromDTO(p)) + .toArray(RangePartition[]::new); + return Transforms.range(rangePartitioningDTO.fieldName(), rangePartitions); case FUNCTION: return Transforms.apply( ((FunctionPartitioningDTO) partitioning).functionName(), diff --git a/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java b/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java index 4013f39b869..58b9c7b9be0 100644 --- a/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java +++ b/common/src/main/java/com/datastrato/gravitino/json/JsonUtils.java @@ -80,6 +80,7 @@ public class JsonUtils { private static final String STRATEGY = "strategy"; private static final String FIELD_NAME = "fieldName"; private static final String FIELD_NAMES = "fieldNames"; + private static final String ASSIGNMENTS_NAME = "assignments"; private static final String NUM_BUCKETS = "numBuckets"; private static final String WIDTH = "width"; private static final String FUNCTION_NAME = "funcName"; @@ -429,6 +430,127 @@ private static void writeFunctionArg(FunctionArg arg, JsonGenerator gen) throws gen.writeEndObject(); } + private static void writePartition(PartitionDTO value, JsonGenerator gen) throws IOException { + gen.writeStartObject(); + gen.writeStringField(PARTITION_TYPE, value.type().name().toLowerCase()); + gen.writeStringField(PARTITION_NAME, value.name()); + switch (value.type()) { + case IDENTITY: + IdentityPartitionDTO identityPartitionDTO = (IdentityPartitionDTO) value; + gen.writeFieldName(FIELD_NAMES); + gen.writeObject(identityPartitionDTO.fieldNames()); + gen.writeArrayFieldStart(IDENTITY_PARTITION_VALUES); + for (LiteralDTO literal : identityPartitionDTO.values()) { + writeFunctionArg(literal, gen); + } + gen.writeEndArray(); + break; + case LIST: + ListPartitionDTO listPartitionDTO = (ListPartitionDTO) value; + gen.writeArrayFieldStart(LIST_PARTITION_LISTS); + for (LiteralDTO[] literals : listPartitionDTO.lists()) { + gen.writeStartArray(); + for (LiteralDTO literal : literals) { + writeFunctionArg(literal, gen); + } + gen.writeEndArray(); + } + gen.writeEndArray(); + break; + case RANGE: + RangePartitionDTO rangePartitionDTO = (RangePartitionDTO) value; + gen.writeFieldName(RANGE_PARTITION_UPPER); + writeFunctionArg(rangePartitionDTO.upper(), gen); + gen.writeFieldName(RANGE_PARTITION_LOWER); + writeFunctionArg(rangePartitionDTO.lower(), gen); + break; + default: + throw new IOException("Unknown partition type: " + value.type()); + } + gen.writeObjectField(PARTITION_PROPERTIES, value.properties()); + gen.writeEndObject(); + } + + private static PartitionDTO readPartition(JsonNode node) { + Preconditions.checkArgument( + node != null && !node.isNull() && node.isObject(), + "Partition must be a valid JSON object, but found: %s", + node); + Preconditions.checkArgument( + node.has(PARTITION_TYPE), "Partition must have a type field, but found: %s", node); + String type = getString(PARTITION_TYPE, node); + switch (PartitionDTO.Type.valueOf(type.toUpperCase())) { + case IDENTITY: + Preconditions.checkArgument( + node.has(FIELD_NAMES) && node.get(FIELD_NAMES).isArray(), + "Identity partition must have array of fieldNames, but found: %s", + node); + Preconditions.checkArgument( + node.has(IDENTITY_PARTITION_VALUES) && node.get(IDENTITY_PARTITION_VALUES).isArray(), + "Identity partition must have array of values, but found: %s", + node); + + List fieldNames = Lists.newArrayList(); + node.get(FIELD_NAMES).forEach(field -> fieldNames.add(getStringArray((ArrayNode) field))); + List values = Lists.newArrayList(); + node.get(IDENTITY_PARTITION_VALUES) + .forEach(value -> values.add((LiteralDTO) readFunctionArg(value))); + return IdentityPartitionDTO.builder() + .withName(getStringOrNull(PARTITION_NAME, node)) + .withFieldNames(fieldNames.toArray(new String[0][0])) + .withValues(values.toArray(new LiteralDTO[0])) + .withProperties(getStringMapOrNull(PARTITION_PROPERTIES, node)) + .build(); + + case LIST: + Preconditions.checkArgument( + node.has(PARTITION_NAME), "List partition must have name, but found: %s", node); + Preconditions.checkArgument( + node.has(LIST_PARTITION_LISTS) && node.get(LIST_PARTITION_LISTS).isArray(), + "List partition must have array of lists, but found: %s", + node); + + List lists = Lists.newArrayList(); + node.get(LIST_PARTITION_LISTS) + .forEach( + list -> { + List literals = Lists.newArrayList(); + list.forEach(literal -> literals.add((LiteralDTO) readFunctionArg(literal))); + lists.add(literals.toArray(new LiteralDTO[0])); + }); + + return ListPartitionDTO.builder() + .withName(getStringOrNull(PARTITION_NAME, node)) + .withLists(lists.toArray(new LiteralDTO[0][0])) + .withProperties(getStringMapOrNull(PARTITION_PROPERTIES, node)) + .build(); + + case RANGE: + Preconditions.checkArgument( + node.has(PARTITION_NAME), "Range partition must have name, but found: %s", node); + Preconditions.checkArgument( + node.has(RANGE_PARTITION_UPPER), + "Range partition must have upper, but found: %s", + node); + Preconditions.checkArgument( + node.has(RANGE_PARTITION_LOWER), + "Range partition must have lower, but found: %s", + node); + + LiteralDTO upper = (LiteralDTO) readFunctionArg(node.get(RANGE_PARTITION_UPPER)); + LiteralDTO lower = (LiteralDTO) readFunctionArg(node.get(RANGE_PARTITION_LOWER)); + return RangePartitionDTO.builder() + .withName(getStringOrNull(PARTITION_NAME, node)) + .withUpper(upper) + .withLower(lower) + .withProperties(getStringMapOrNull(PARTITION_PROPERTIES, node)) + .build(); + + default: + throw new IllegalArgumentException("Unknown partition type: " + type); + } + } + /** * Get a string value from a JSON node property. * @@ -896,11 +1018,21 @@ public void serialize(Partitioning value, JsonGenerator gen, SerializerProvider ListPartitioningDTO listPartitioningDTO = (ListPartitioningDTO) value; gen.writeFieldName(FIELD_NAMES); gen.writeObject(listPartitioningDTO.fieldNames()); + gen.writeArrayFieldStart(ASSIGNMENTS_NAME); + for (ListPartitionDTO listPartitionDTO : listPartitioningDTO.assignments()) { + writePartition(listPartitionDTO, gen); + } + gen.writeEndArray(); break; case RANGE: RangePartitioningDTO rangePartitioningDTO = (RangePartitioningDTO) value; gen.writeFieldName(FIELD_NAME); gen.writeObject(rangePartitioningDTO.fieldName()); + gen.writeArrayFieldStart(ASSIGNMENTS_NAME); + for (PartitionDTO rangePartitionDTO : rangePartitioningDTO.assignments()) { + writePartition(rangePartitionDTO, gen); + } + gen.writeEndArray(); break; case FUNCTION: FunctionPartitioningDTO funcExpression = (FunctionPartitioningDTO) value; @@ -933,29 +1065,80 @@ public Partitioning deserialize(JsonParser p, DeserializationContext ctxt) throw switch (Partitioning.Strategy.getByName(strategy)) { case IDENTITY: return IdentityPartitioningDTO.of(getStringList(FIELD_NAME, node).toArray(new String[0])); + case YEAR: return YearPartitioningDTO.of(getStringList(FIELD_NAME, node).toArray(new String[0])); + case MONTH: return MonthPartitioningDTO.of(getStringList(FIELD_NAME, node).toArray(new String[0])); + case DAY: return DayPartitioningDTO.of(getStringList(FIELD_NAME, node).toArray(new String[0])); + case HOUR: return HourPartitioningDTO.of(getStringList(FIELD_NAME, node).toArray(new String[0])); + case BUCKET: int numBuckets = getInt(NUM_BUCKETS, node); List fieldNames = Lists.newArrayList(); node.get(FIELD_NAMES).forEach(field -> fieldNames.add(getStringArray((ArrayNode) field))); return BucketPartitioningDTO.of(numBuckets, fieldNames.toArray(new String[0][0])); + case TRUNCATE: int width = getInt(WIDTH, node); return TruncatePartitioningDTO.of( width, getStringList(FIELD_NAME, node).toArray(new String[0])); + case LIST: List listFields = Lists.newArrayList(); node.get(FIELD_NAMES).forEach(field -> listFields.add(getStringArray((ArrayNode) field))); - return ListPartitioningDTO.of(listFields.toArray(new String[0][0])); + + if (!node.hasNonNull(ASSIGNMENTS_NAME)) { + return ListPartitioningDTO.of(listFields.toArray(new String[0][0])); + } + + Preconditions.checkArgument( + node.get(ASSIGNMENTS_NAME).isArray(), + "Cannot parse list partitioning from non-array assignments: %s", + node); + List assignments = Lists.newArrayList(); + node.get(ASSIGNMENTS_NAME) + .forEach( + assignment -> { + PartitionDTO partitionDTO = readPartition(assignment); + Preconditions.checkArgument( + partitionDTO instanceof ListPartitionDTO, + "Cannot parse list partitioning from non-list assignment: %s", + assignment); + assignments.add((ListPartitionDTO) partitionDTO); + }); + return ListPartitioningDTO.of( + listFields.toArray(new String[0][0]), assignments.toArray(new ListPartitionDTO[0])); + case RANGE: - return RangePartitioningDTO.of(getStringList(FIELD_NAME, node).toArray(new String[0])); + String[] fields = getStringList(FIELD_NAME, node).toArray(new String[0]); + if (!node.hasNonNull(ASSIGNMENTS_NAME)) { + return RangePartitioningDTO.of(fields); + } + + Preconditions.checkArgument( + node.get(ASSIGNMENTS_NAME).isArray(), + "Cannot parse range partitioning from non-array assignments: %s", + node); + List rangeAssignments = Lists.newArrayList(); + node.get(ASSIGNMENTS_NAME) + .forEach( + assignment -> { + PartitionDTO partitionDTO = readPartition(assignment); + Preconditions.checkArgument( + partitionDTO instanceof RangePartitionDTO, + "Cannot parse range partitioning from non-range assignment: %s", + assignment); + rangeAssignments.add((RangePartitionDTO) partitionDTO); + }); + return RangePartitioningDTO.of( + fields, rangeAssignments.toArray(new RangePartitionDTO[0])); + case FUNCTION: String functionName = getString(FUNCTION_NAME, node); Preconditions.checkArgument( @@ -965,6 +1148,7 @@ public Partitioning deserialize(JsonParser p, DeserializationContext ctxt) throw List args = Lists.newArrayList(); node.get(FUNCTION_ARGS).forEach(arg -> args.add(readFunctionArg(arg))); return FunctionPartitioningDTO.of(functionName, args.toArray(FunctionArg.EMPTY_ARGS)); + default: throw new IOException("Unknown partitioning strategy: " + strategy); } @@ -1082,44 +1266,7 @@ public static class PartitionDTOSerializer extends JsonSerializer @Override public void serialize(PartitionDTO value, JsonGenerator gen, SerializerProvider serializers) throws IOException { - gen.writeStartObject(); - gen.writeStringField(PARTITION_TYPE, value.type().name().toLowerCase()); - gen.writeStringField(PARTITION_NAME, value.name()); - switch (value.type()) { - case IDENTITY: - IdentityPartitionDTO identityPartitionDTO = (IdentityPartitionDTO) value; - gen.writeFieldName(FIELD_NAMES); - gen.writeObject(identityPartitionDTO.fieldNames()); - gen.writeArrayFieldStart(IDENTITY_PARTITION_VALUES); - for (LiteralDTO literal : identityPartitionDTO.values()) { - writeFunctionArg(literal, gen); - } - gen.writeEndArray(); - break; - case LIST: - ListPartitionDTO listPartitionDTO = (ListPartitionDTO) value; - gen.writeArrayFieldStart(LIST_PARTITION_LISTS); - for (LiteralDTO[] literals : listPartitionDTO.lists()) { - gen.writeStartArray(); - for (LiteralDTO literal : literals) { - writeFunctionArg(literal, gen); - } - gen.writeEndArray(); - } - gen.writeEndArray(); - break; - case RANGE: - RangePartitionDTO rangePartitionDTO = (RangePartitionDTO) value; - gen.writeFieldName(RANGE_PARTITION_UPPER); - writeFunctionArg(rangePartitionDTO.upper(), gen); - gen.writeFieldName(RANGE_PARTITION_LOWER); - writeFunctionArg(rangePartitionDTO.lower(), gen); - break; - default: - throw new IOException("Unknown partition type: " + value.type()); - } - gen.writeObjectField(PARTITION_PROPERTIES, value.properties()); - gen.writeEndObject(); + writePartition(value, gen); } } @@ -1128,83 +1275,7 @@ public static class PartitionDTODeserializer extends JsonDeserializer fieldNames = Lists.newArrayList(); - node.get(FIELD_NAMES).forEach(field -> fieldNames.add(getStringArray((ArrayNode) field))); - List values = Lists.newArrayList(); - node.get(IDENTITY_PARTITION_VALUES) - .forEach(value -> values.add((LiteralDTO) readFunctionArg(value))); - return IdentityPartitionDTO.builder() - .withName(getStringOrNull(PARTITION_NAME, node)) - .withFieldNames(fieldNames.toArray(new String[0][0])) - .withValues(values.toArray(new LiteralDTO[0])) - .withProperties(getStringMapOrNull(PARTITION_PROPERTIES, node)) - .build(); - - case LIST: - Preconditions.checkArgument( - node.has(PARTITION_NAME), "List partition must have name, but found: %s", node); - Preconditions.checkArgument( - node.has(LIST_PARTITION_LISTS) && node.get(LIST_PARTITION_LISTS).isArray(), - "List partition must have array of lists, but found: %s", - node); - - List lists = Lists.newArrayList(); - node.get(LIST_PARTITION_LISTS) - .forEach( - list -> { - List literals = Lists.newArrayList(); - list.forEach(literal -> literals.add((LiteralDTO) readFunctionArg(literal))); - lists.add(literals.toArray(new LiteralDTO[0])); - }); - - return ListPartitionDTO.builder() - .withName(getStringOrNull(PARTITION_NAME, node)) - .withLists(lists.toArray(new LiteralDTO[0][0])) - .withProperties(getStringMapOrNull(PARTITION_PROPERTIES, node)) - .build(); - - case RANGE: - Preconditions.checkArgument( - node.has(PARTITION_NAME), "Range partition must have name, but found: %s", node); - Preconditions.checkArgument( - node.has(RANGE_PARTITION_UPPER), - "Range partition must have upper, but found: %s", - node); - Preconditions.checkArgument( - node.has(RANGE_PARTITION_LOWER), - "Range partition must have lower, but found: %s", - node); - - LiteralDTO upper = (LiteralDTO) readFunctionArg(node.get(RANGE_PARTITION_UPPER)); - LiteralDTO lower = (LiteralDTO) readFunctionArg(node.get(RANGE_PARTITION_LOWER)); - return RangePartitionDTO.builder() - .withName(getStringOrNull(PARTITION_NAME, node)) - .withUpper(upper) - .withLower(lower) - .withProperties(getStringMapOrNull(PARTITION_PROPERTIES, node)) - .build(); - - default: - throw new IOException("Unknown partition type: " + type); - } + return readPartition(node); } } diff --git a/common/src/test/java/com/datastrato/gravitino/json/TestDTOJsonSerDe.java b/common/src/test/java/com/datastrato/gravitino/json/TestDTOJsonSerDe.java index e1540d9defc..f6b51d7fccf 100644 --- a/common/src/test/java/com/datastrato/gravitino/json/TestDTOJsonSerDe.java +++ b/common/src/test/java/com/datastrato/gravitino/json/TestDTOJsonSerDe.java @@ -25,6 +25,8 @@ import com.datastrato.gravitino.dto.rel.partitioning.RangePartitioningDTO; import com.datastrato.gravitino.dto.rel.partitioning.TruncatePartitioningDTO; import com.datastrato.gravitino.dto.rel.partitioning.YearPartitioningDTO; +import com.datastrato.gravitino.dto.rel.partitions.ListPartitionDTO; +import com.datastrato.gravitino.dto.rel.partitions.RangePartitionDTO; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.types.Type; import com.datastrato.gravitino.rel.types.Types; @@ -312,24 +314,66 @@ public void testPartitioningDTOSerDe() throws Exception { Partitioning yearPart = YearPartitioningDTO.of(field1); // construct list partition - // TODO: support assign partition value - // String[][] p1Value = {{"2023-04-01", "San Francisco"}, {"2023-04-01", "San Francisco"}}; - // String[][] p2Value = {{"2023-04-01", "Houston"}, {"2023-04-01", "Dallas"}}; - Partitioning listPart = - ListPartitioningDTO.builder() - .withFieldNames(new String[][] {field1, field2}) - // .withAssignment("p202304_California", p1Value) - // .withAssignment("p202304_Texas", p2Value) + LiteralDTO[][] pCaliforniaValue = { + { + LiteralDTO.builder().withValue("2023-04-01").withDataType(Types.DateType.get()).build(), + LiteralDTO.builder().withValue("San Francisco").withDataType(Types.StringType.get()).build() + }, + { + LiteralDTO.builder().withValue("2023-04-01").withDataType(Types.DateType.get()).build(), + LiteralDTO.builder().withValue("San Diego").withDataType(Types.StringType.get()).build() + } + }; + ListPartitionDTO pCalifornia = + ListPartitionDTO.builder() + .withName("p202304_California") + .withLists(pCaliforniaValue) .build(); + LiteralDTO[][] pTexasValue = { + { + LiteralDTO.builder().withValue("2023-04-01").withDataType(Types.DateType.get()).build(), + LiteralDTO.builder().withValue("Houston").withDataType(Types.StringType.get()).build() + }, + { + LiteralDTO.builder().withValue("2023-04-01").withDataType(Types.DateType.get()).build(), + LiteralDTO.builder().withValue("Dallas").withDataType(Types.StringType.get()).build() + } + }; + ListPartitionDTO pTexas = + ListPartitionDTO.builder().withName("p202304_Texas").withLists(pTexasValue).build(); + + Partitioning listPart = + ListPartitioningDTO.of( + new String[][] {field1, field2}, new ListPartitionDTO[] {pCalifornia, pTexas}); + // construct range partition - // TODO: support assign partition value - Partitioning rangePart = - RangePartitioningDTO.builder() - .withFieldName(field1) - // .withRange("p20230101", "2023-01-01T00:00:00", "2023-01-02T00:00:00") - // .withRange("p20230102", "2023-01-01T00:00:00", null) + RangePartitionDTO p20230101 = + RangePartitionDTO.builder() + .withName("p20230101") + .withUpper( + LiteralDTO.builder() + .withValue("2023-01-01") + .withDataType(Types.DateType.get()) + .build()) + .withLower( + LiteralDTO.builder() + .withValue("2023-01-01") + .withDataType(Types.DateType.get()) + .build()) .build(); + RangePartitionDTO p20230102 = + RangePartitionDTO.builder() + .withName("p20230102") + .withUpper( + LiteralDTO.builder() + .withValue("2023-01-02") + .withDataType(Types.DateType.get()) + .build()) + .withLower(LiteralDTO.NULL) + .build(); + Partitioning rangePart = + RangePartitioningDTO.of(field1, new RangePartitionDTO[] {p20230101, p20230102}); // construct function partitioning, toYYYYMM(toDate(ts, ‘Asia/Shanghai’)) FunctionArg arg1 = FieldReferenceDTO.of(field1); @@ -368,7 +412,7 @@ public void testPartitioningDTOSerDe() throws Exception { } @Test - public void testPartitioningDTOSerDeFail() throws Exception { + public void testPartitioningDTOSerDeFail() { // test `strategy` value null String wrongJson1 = "{\"strategy\": null,\"fieldName\":[\"dt\"]}"; ObjectMapper map = JsonUtils.objectMapper(); @@ -393,5 +437,64 @@ public void testPartitioningDTOSerDeFail() throws Exception { Assertions.assertThrows( IllegalArgumentException.class, () -> map.readValue(wrongJson6, Partitioning.class)); Assertions.assertTrue(exception.getMessage().contains("Invalid partitioning strategy")); + + // test invalid arguments for partitioning + String wrongJson3 = + "{\n" + + " \"strategy\": \"list\",\n" + + " \"fieldNames\": [\n" + + " [\n" + + " \"dt\"\n" + + " ],\n" + + " [\n" + + " \"city\"\n" + + " ]\n" + + " ],\n" + + " \"assignments\": \"partitions\"\n" + + "}"; + exception = + Assertions.assertThrows( + IllegalArgumentException.class, () -> map.readValue(wrongJson3, Partitioning.class)); + Assertions.assertTrue( + exception + .getMessage() + .contains("Cannot parse list partitioning from non-array assignments"), + exception.getMessage()); + + String wrong4 = + "{\n" + + " \"strategy\": \"list\",\n" + + " \"fieldNames\": [\n" + + " [\n" + + " \"dt\"\n" + + " ],\n" + + " [\n" + + " \"city\"\n" + + " ]\n" + + " ],\n" + + " \"assignments\": [\n" + + " {\n" + + " \"type\": \"range\",\n" + + " \"name\": \"p20230101\",\n" + + " \"upper\": {\n" + + " \"type\": \"literal\",\n" + + " \"dataType\": \"date\",\n" + + " \"value\": \"2023-01-01\"\n" + + " },\n" + + " \"lower\": {\n" + + " \"type\": \"literal\",\n" + + " \"dataType\": \"date\",\n" + + " \"value\": \"2023-01-01\"\n" + + " },\n" + + " \"properties\": null\n" + + " }\n" + + " ]\n" + + "}"; + exception = + Assertions.assertThrows( + IllegalArgumentException.class, () -> map.readValue(wrong4, Partitioning.class)); + Assertions.assertTrue( + exception.getMessage().contains("Cannot parse list partitioning from non-list assignment"), + exception.getMessage()); } } diff --git a/docs/open-api/partitioning.yaml b/docs/open-api/partitioning.yaml index 61f4a498dbf..0df01a59bd1 100644 --- a/docs/open-api/partitioning.yaml +++ b/docs/open-api/partitioning.yaml @@ -144,6 +144,11 @@ components: - "list" fieldNames: $ref: "./tables.yaml#/components/schemas/FieldNames" + assignments: + type: array + description: The pre-assigned list partitions + items: + $ref: "./partitions.yaml#/components/schemas/ListPartition" RangePartitioning: type: object @@ -157,6 +162,11 @@ components: - "range" fieldName: $ref: "./tables.yaml#/components/schemas/FieldName" + assignments: + type: array + description: The pre-assigned range partitions + items: + $ref: "./partitions.yaml#/components/schemas/RangePartition" FunctionPartitioning: type: object diff --git a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestTableOperations.java b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestTableOperations.java index 6d46bdf6fcb..40851410206 100644 --- a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestTableOperations.java +++ b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestTableOperations.java @@ -27,7 +27,9 @@ import com.datastrato.gravitino.dto.rel.expressions.LiteralDTO; import com.datastrato.gravitino.dto.rel.indexes.IndexDTO; import com.datastrato.gravitino.dto.rel.partitioning.IdentityPartitioningDTO; +import com.datastrato.gravitino.dto.rel.partitioning.ListPartitioningDTO; import com.datastrato.gravitino.dto.rel.partitioning.Partitioning; +import com.datastrato.gravitino.dto.rel.partitions.ListPartitionDTO; import com.datastrato.gravitino.dto.requests.TableCreateRequest; import com.datastrato.gravitino.dto.requests.TableUpdateRequest; import com.datastrato.gravitino.dto.requests.TableUpdatesRequest; @@ -416,6 +418,61 @@ public void testCreatePartitionedTable() { ErrorResponse errorResp3 = resp.readEntity(ErrorResponse.class); Assertions.assertEquals(ErrorConstants.ILLEGAL_ARGUMENTS_CODE, errorResp3.getCode()); Assertions.assertEquals(IllegalArgumentException.class.getSimpleName(), errorResp3.getType()); + + // Test partitioning with assignments + Partitioning[] partitioningWithAssignments = { + ListPartitioningDTO.of( + new String[][] {{"col1"}, {"col2"}}, + new ListPartitionDTO[] { + ListPartitionDTO.builder() + .withName("v1") + .withLists( + new LiteralDTO[][] { + { + LiteralDTO.builder() + .withDataType(Types.StringType.get()) + .withValue("a") + .build(), + LiteralDTO.builder() + .withDataType(Types.ByteType.get()) + .withValue("1") + .build() + } + }) + .build() + }) + }; + Table tableWithAssignments = + mockTable( + "table1", + columns, + "mock comment", + ImmutableMap.of("k1", "v1"), + partitioningWithAssignments); + when(dispatcher.createTable(any(), any(), any(), any(), any(), any(), any(), any())) + .thenReturn(tableWithAssignments); + + req = + new TableCreateRequest( + "table1", + "mock comment", + Arrays.stream(columns).map(DTOConverters::toDTO).toArray(ColumnDTO[]::new), + ImmutableMap.of("k1", "v1"), + SortOrderDTO.EMPTY_SORT, + DistributionDTO.NONE, + partitioningWithAssignments, + IndexDTO.EMPTY_INDEXES); + resp = + target(tablePath(metalake, catalog, schema)) + .request(MediaType.APPLICATION_JSON_TYPE) + .accept("application/vnd.gravitino.v1+json") + .post(Entity.entity(req, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + + tableResp = resp.readEntity(TableResponse.class); + Assertions.assertEquals(0, tableResp.getCode()); + Assertions.assertArrayEquals(partitioningWithAssignments, tableResp.getTable().partitioning()); } @Test