Skip to content

Commit

Permalink
Extract Iceberg bucketing method
Browse files Browse the repository at this point in the history
Co-authored-by: Piotr Findeisen <[email protected]>
  • Loading branch information
wendigo and findepi committed Jul 19, 2021
1 parent 07c6e73 commit ba19a0f
Showing 1 changed file with 38 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,39 +122,7 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type type
Matcher matcher = BUCKET_PATTERN.matcher(transform);
if (matcher.matches()) {
int count = parseInt(matcher.group(1));
if (type.equals(INTEGER)) {
return new ColumnTransform(INTEGER, block -> bucketInteger(block, count));
}
if (type.equals(BIGINT)) {
return new ColumnTransform(INTEGER, block -> bucketBigint(block, count));
}
if (isShortDecimal(type)) {
DecimalType decimal = (DecimalType) type;
return new ColumnTransform(INTEGER, block -> bucketShortDecimal(decimal, block, count));
}
if (isLongDecimal(type)) {
DecimalType decimal = (DecimalType) type;
return new ColumnTransform(INTEGER, block -> bucketLongDecimal(decimal, block, count));
}
if (type.equals(DATE)) {
return new ColumnTransform(INTEGER, block -> bucketDate(block, count));
}
if (type.equals(TIME_MICROS)) {
return new ColumnTransform(INTEGER, block -> bucketTime(block, count));
}
if (type.equals(TIMESTAMP_MICROS)) {
return new ColumnTransform(INTEGER, block -> bucketTimestamp(block, count));
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return new ColumnTransform(INTEGER, block -> bucketTimestampWithTimeZone(block, count));
}
if (type instanceof VarcharType) {
return new ColumnTransform(INTEGER, block -> bucketVarchar(block, count));
}
if (type.equals(VARBINARY)) {
return new ColumnTransform(INTEGER, block -> bucketVarbinary(block, count));
}
throw new UnsupportedOperationException("Unsupported type for 'bucket': " + field);
return new ColumnTransform(INTEGER, getBucketTransform(type, count));
}

matcher = TRUNCATE_PATTERN.matcher(transform);
Expand Down Expand Up @@ -186,6 +154,43 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type type
throw new UnsupportedOperationException("Unsupported partition transform: " + field);
}

public static Function<Block, Block> getBucketTransform(Type type, int count)
{
if (type.equals(INTEGER)) {
return block -> bucketInteger(block, count);
}
if (type.equals(BIGINT)) {
return block -> bucketBigint(block, count);
}
if (isShortDecimal(type)) {
DecimalType decimal = (DecimalType) type;
return block -> bucketShortDecimal(decimal, block, count);
}
if (isLongDecimal(type)) {
DecimalType decimal = (DecimalType) type;
return block -> bucketLongDecimal(decimal, block, count);
}
if (type.equals(DATE)) {
return block -> bucketDate(block, count);
}
if (type.equals(TIME_MICROS)) {
return block -> bucketTime(block, count);
}
if (type.equals(TIMESTAMP_MICROS)) {
return block -> bucketTimestamp(block, count);
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return block -> bucketTimestampWithTimeZone(block, count);
}
if (type instanceof VarcharType) {
return block -> bucketVarchar(block, count);
}
if (type.equals(VARBINARY)) {
return block -> bucketVarbinary(block, count);
}
throw new UnsupportedOperationException("Unsupported type for 'bucket': " + type);
}

private static Block yearsFromDate(Block block)
{
return extractDate(block, value -> epochYear(DAYS.toMillis(value)));
Expand Down

0 comments on commit ba19a0f

Please sign in to comment.