Skip to content

Commit

Permalink
Consolidate ColumnTransform definition
Browse files Browse the repository at this point in the history
Let factory method be fully responsible for defining `ColumnTransform`.
  • Loading branch information
findepi committed Nov 4, 2021
1 parent b4c10be commit efa088e
Showing 1 changed file with 122 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,87 +78,99 @@ public static ColumnTransform getColumnTransform(PartitionField field, Type type

switch (transform) {
case "identity":
return new ColumnTransform(type, Function.identity());
return identity(type);
case "year":
if (type.equals(DATE)) {
return new ColumnTransform(INTEGER, PartitionTransforms::yearsFromDate);
return yearsFromDate();
}
if (type.equals(TIMESTAMP_MICROS)) {
return new ColumnTransform(INTEGER, PartitionTransforms::yearsFromTimestamp);
return yearsFromTimestamp();
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return new ColumnTransform(INTEGER, PartitionTransforms::yearsFromTimestampWithTimeZone);
return yearsFromTimestampWithTimeZone();
}
throw new UnsupportedOperationException("Unsupported type for 'year': " + field);
case "month":
if (type.equals(DATE)) {
return new ColumnTransform(INTEGER, PartitionTransforms::monthsFromDate);
return monthsFromDate();
}
if (type.equals(TIMESTAMP_MICROS)) {
return new ColumnTransform(INTEGER, PartitionTransforms::monthsFromTimestamp);
return monthsFromTimestamp();
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return new ColumnTransform(INTEGER, PartitionTransforms::monthsFromTimestampWithTimeZone);
return monthsFromTimestampWithTimeZone();
}
throw new UnsupportedOperationException("Unsupported type for 'month': " + field);
case "day":
if (type.equals(DATE)) {
return new ColumnTransform(INTEGER, PartitionTransforms::daysFromDate);
return daysFromDate();
}
if (type.equals(TIMESTAMP_MICROS)) {
return new ColumnTransform(INTEGER, PartitionTransforms::daysFromTimestamp);
return daysFromTimestamp();
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return new ColumnTransform(INTEGER, PartitionTransforms::daysFromTimestampWithTimeZone);
return daysFromTimestampWithTimeZone();
}
throw new UnsupportedOperationException("Unsupported type for 'day': " + field);
case "hour":
if (type.equals(TIMESTAMP_MICROS)) {
return new ColumnTransform(INTEGER, PartitionTransforms::hoursFromTimestamp);
return hoursFromTimestamp();
}
if (type.equals(TIMESTAMP_TZ_MICROS)) {
return new ColumnTransform(INTEGER, PartitionTransforms::hoursFromTimestampWithTimeZone);
return hoursFromTimestampWithTimeZone();
}
throw new UnsupportedOperationException("Unsupported type for 'hour': " + field);
case "void":
return new ColumnTransform(type, getVoidTransform(type));
return voidTransform(type);
}

Matcher matcher = BUCKET_PATTERN.matcher(transform);
if (matcher.matches()) {
int count = parseInt(matcher.group(1));
return new ColumnTransform(INTEGER, getBucketTransform(type, count));
return bucket(type, count);
}

matcher = TRUNCATE_PATTERN.matcher(transform);
if (matcher.matches()) {
int width = parseInt(matcher.group(1));
if (type.equals(INTEGER)) {
return new ColumnTransform(INTEGER, block -> truncateInteger(block, width));
return truncateInteger(width);
}
if (type.equals(BIGINT)) {
return new ColumnTransform(BIGINT, block -> truncateBigint(block, width));
return truncateBigint(width);
}
if (isShortDecimal(type)) {
DecimalType decimal = (DecimalType) type;
return new ColumnTransform(type, block -> truncateShortDecimal(decimal, block, width));
return truncateShortDecimal(type, width, decimal);
}
if (isLongDecimal(type)) {
DecimalType decimal = (DecimalType) type;
return new ColumnTransform(type, block -> truncateLongDecimal(decimal, block, width));
return truncateLongDecimal(type, width, decimal);
}
if (type instanceof VarcharType) {
return new ColumnTransform(VARCHAR, block -> truncateVarchar(block, width));
return truncateVarchar(width);
}
if (type.equals(VARBINARY)) {
return new ColumnTransform(VARBINARY, block -> truncateVarbinary(block, width));
return truncateVarbinary(width);
}
throw new UnsupportedOperationException("Unsupported type for 'truncate': " + field);
}

throw new UnsupportedOperationException("Unsupported partition transform: " + field);
}

private static ColumnTransform identity(Type type)
{
return new ColumnTransform(type, Function.identity());
}

private static ColumnTransform bucket(Type type, int count)
{
return new ColumnTransform(
INTEGER,
getBucketTransform(type, count));
}

public static Function<Block, Block> getBucketTransform(Type type, int count)
{
if (type.equals(INTEGER)) {
Expand Down Expand Up @@ -199,19 +211,25 @@ public static Function<Block, Block> getBucketTransform(Type type, int count)
throw new UnsupportedOperationException("Unsupported type for 'bucket': " + type);
}

private static Block yearsFromDate(Block block)
private static ColumnTransform yearsFromDate()
{
return extractDate(block, value -> epochYear(DAYS.toMillis(value)));
return new ColumnTransform(
INTEGER,
block -> extractDate(block, value -> epochYear(DAYS.toMillis(value))));
}

private static Block monthsFromDate(Block block)
private static ColumnTransform monthsFromDate()
{
return extractDate(block, value -> epochMonth(DAYS.toMillis(value)));
return new ColumnTransform(
INTEGER,
block -> extractDate(block, value -> epochMonth(DAYS.toMillis(value))));
}

private static Block daysFromDate(Block block)
private static ColumnTransform daysFromDate()
{
return extractDate(block, LongUnaryOperator.identity());
return new ColumnTransform(
INTEGER,
block -> extractDate(block, LongUnaryOperator.identity()));
}

private static Block extractDate(Block block, LongUnaryOperator function)
Expand All @@ -228,24 +246,32 @@ private static Block extractDate(Block block, LongUnaryOperator function)
return builder.build();
}

private static Block yearsFromTimestamp(Block block)
private static ColumnTransform yearsFromTimestamp()
{
return extractTimestamp(block, PartitionTransforms::epochYear);
return new ColumnTransform(
INTEGER,
block -> extractTimestamp(block, PartitionTransforms::epochYear));
}

private static Block monthsFromTimestamp(Block block)
private static ColumnTransform monthsFromTimestamp()
{
return extractTimestamp(block, PartitionTransforms::epochMonth);
return new ColumnTransform(
INTEGER,
block -> extractTimestamp(block, PartitionTransforms::epochMonth));
}

private static Block daysFromTimestamp(Block block)
private static ColumnTransform daysFromTimestamp()
{
return extractTimestamp(block, PartitionTransforms::epochDay);
return new ColumnTransform(
INTEGER,
block -> extractTimestamp(block, PartitionTransforms::epochDay));
}

private static Block hoursFromTimestamp(Block block)
private static ColumnTransform hoursFromTimestamp()
{
return extractTimestamp(block, PartitionTransforms::epochHour);
return new ColumnTransform(
INTEGER,
block -> extractTimestamp(block, PartitionTransforms::epochHour));
}

private static Block extractTimestamp(Block block, LongUnaryOperator function)
Expand All @@ -263,24 +289,32 @@ private static Block extractTimestamp(Block block, LongUnaryOperator function)
return builder.build();
}

private static Block yearsFromTimestampWithTimeZone(Block block)
private static ColumnTransform yearsFromTimestampWithTimeZone()
{
return extractTimestampWithTimeZone(block, PartitionTransforms::epochYear);
return new ColumnTransform(
INTEGER,
block -> extractTimestampWithTimeZone(block, PartitionTransforms::epochYear));
}

private static Block monthsFromTimestampWithTimeZone(Block block)
private static ColumnTransform monthsFromTimestampWithTimeZone()
{
return extractTimestampWithTimeZone(block, PartitionTransforms::epochMonth);
return new ColumnTransform(
INTEGER,
block -> extractTimestampWithTimeZone(block, PartitionTransforms::epochMonth));
}

private static Block daysFromTimestampWithTimeZone(Block block)
private static ColumnTransform daysFromTimestampWithTimeZone()
{
return extractTimestampWithTimeZone(block, PartitionTransforms::epochDay);
return new ColumnTransform(
INTEGER,
block -> extractTimestampWithTimeZone(block, PartitionTransforms::epochDay));
}

private static Block hoursFromTimestampWithTimeZone(Block block)
private static ColumnTransform hoursFromTimestampWithTimeZone()
{
return extractTimestampWithTimeZone(block, PartitionTransforms::epochHour);
return new ColumnTransform(
INTEGER,
block -> extractTimestampWithTimeZone(block, PartitionTransforms::epochHour));
}

private static Block extractTimestampWithTimeZone(Block block, LongUnaryOperator function)
Expand Down Expand Up @@ -388,6 +422,13 @@ private static int bucketHash(Slice value)
return Murmur3Hash32.hash(value);
}

private static ColumnTransform truncateInteger(int width)
{
return new ColumnTransform(
INTEGER,
block -> truncateInteger(block, width));
}

private static Block truncateInteger(Block block, int width)
{
BlockBuilder builder = INTEGER.createFixedSizeBlockBuilder(block.getPositionCount());
Expand All @@ -403,6 +444,13 @@ private static Block truncateInteger(Block block, int width)
return builder.build();
}

private static ColumnTransform truncateBigint(int width)
{
return new ColumnTransform(
BIGINT,
block -> truncateBigint(block, width));
}

private static Block truncateBigint(Block block, int width)
{
BlockBuilder builder = BIGINT.createFixedSizeBlockBuilder(block.getPositionCount());
Expand All @@ -418,6 +466,13 @@ private static Block truncateBigint(Block block, int width)
return builder.build();
}

private static ColumnTransform truncateShortDecimal(Type type, int width, DecimalType decimal)
{
return new ColumnTransform(
type,
block -> truncateShortDecimal(decimal, block, width));
}

private static Block truncateShortDecimal(DecimalType type, Block block, int width)
{
BigInteger unscaledWidth = BigInteger.valueOf(width);
Expand All @@ -435,6 +490,13 @@ private static Block truncateShortDecimal(DecimalType type, Block block, int wid
return builder.build();
}

private static ColumnTransform truncateLongDecimal(Type type, int width, DecimalType decimal)
{
return new ColumnTransform(
type,
block -> truncateLongDecimal(decimal, block, width));
}

private static Block truncateLongDecimal(DecimalType type, Block block, int width)
{
BigInteger unscaledWidth = BigInteger.valueOf(width);
Expand Down Expand Up @@ -463,6 +525,13 @@ private static BigDecimal truncateDecimal(BigDecimal value, BigInteger unscaledW
return value.subtract(remainder);
}

private static ColumnTransform truncateVarchar(int width)
{
return new ColumnTransform(
VARCHAR,
block -> truncateVarchar(block, width));
}

private static Block truncateVarchar(Block block, int max)
{
BlockBuilder builder = VARCHAR.createBlockBuilder(null, block.getPositionCount());
Expand Down Expand Up @@ -490,6 +559,13 @@ private static Slice truncateVarchar(Slice value, int max)
return value.slice(0, end);
}

private static ColumnTransform truncateVarbinary(int width)
{
return new ColumnTransform(
VARBINARY,
block -> truncateVarbinary(block, width));
}

private static Block truncateVarbinary(Block block, int max)
{
BlockBuilder builder = VARBINARY.createBlockBuilder(null, block.getPositionCount());
Expand All @@ -505,10 +581,12 @@ private static Block truncateVarbinary(Block block, int max)
return builder.build();
}

public static Function<Block, Block> getVoidTransform(Type type)
private static ColumnTransform voidTransform(Type type)
{
Block nullBlock = nativeValueToBlock(type, null);
return block -> new RunLengthEncodedBlock(nullBlock, block.getPositionCount());
return new ColumnTransform(
type,
block -> new RunLengthEncodedBlock(nullBlock, block.getPositionCount()));
}

@VisibleForTesting
Expand Down

0 comments on commit efa088e

Please sign in to comment.