Skip to content

Commit

Permalink
Rename and simplify encode method call
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Jan 26, 2024
1 parent 0aa8862 commit b818c34
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,16 @@ public BucketMetadata(
this.hashType = hashType;
this.hashFunction = Suppliers.memoize(new HashFunctionSupplier(hashType));
this.bucketIdFn = Suppliers.memoize(BucketIdFnSupplier.create(hashType));
this.primaryKeyEncoder = Suppliers.memoize(EncoderSupplier.create(hashType, keyClass));
this.secondaryKeyEncoder =
Suppliers.memoize(EncoderSupplier.create(hashType, keyClassSecondary));
this.keyCoder = getKeyCoder(keyClass);
this.keyCoderSecondary = keyClassSecondary == null ? null : getKeyCoder(keyClassSecondary);
this.version = version;
this.filenamePrefix =
filenamePrefix != null ? filenamePrefix : SortedBucketIO.DEFAULT_FILENAME_PREFIX;
this.primaryKeyEncoder = Suppliers.memoize(KeyEncoderSupplier.create(hashType, keyCoder));
this.secondaryKeyEncoder =
keyClassSecondary == null
? null
: Suppliers.memoize(KeyEncoderSupplier.create(hashType, keyCoderSecondary));
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
Expand Down Expand Up @@ -240,10 +242,10 @@ static BucketIdFn icebergFn() {
}

public interface KeyEncoder<T> extends Serializable {
byte[] encode(T value, Coder<T> coder);
byte[] encode(T value);

static <T> KeyEncoder<T> defaultEncoder() {
return (value, coder) -> {
static <T> KeyEncoder<T> defaultKeyEncoder(Coder<T> coder) {
return value -> {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
coder.encode(value, baos);
Expand All @@ -254,8 +256,8 @@ static <T> KeyEncoder<T> defaultEncoder() {
};
}

static <T> KeyEncoder<T> icebergEncoder(Class<T> klass) {
return IcebergEncoder.create(klass);
static <T> KeyEncoder<T> icebergKeyEncoder(Coder<T> coder) {
return IcebergEncoder.create(coder.getEncodedTypeDescriptor().getRawType());
}
}

Expand Down Expand Up @@ -285,8 +287,8 @@ public BucketIdFn bucketIdFn() {
}

@Override
public <T> KeyEncoder<T> encoder(Class<T> klass) {
return KeyEncoder.icebergEncoder(klass);
public <T> KeyEncoder<T> keyEncoder(Coder<T> coder) {
return KeyEncoder.icebergKeyEncoder(coder);
}
};

Expand All @@ -296,8 +298,8 @@ public BucketIdFn bucketIdFn() {
return BucketIdFn.defaultFn();
}

public <T> KeyEncoder<T> encoder(Class<T> klass) {
return KeyEncoder.defaultEncoder();
public <T> KeyEncoder<T> keyEncoder(Coder<T> coder) {
return KeyEncoder.defaultKeyEncoder(coder);
}
}

Expand Down Expand Up @@ -362,7 +364,7 @@ byte[] encodeKeyBytesPrimary(K1 key) {
return null;
}

return primaryKeyEncoder.get().encode(key, keyCoder);
return primaryKeyEncoder.get().encode(key);
}

byte[] getKeyBytesSecondary(V value) {
Expand All @@ -372,7 +374,7 @@ byte[] getKeyBytesSecondary(V value) {
return null;
}

return secondaryKeyEncoder.get().encode(key, keyCoderSecondary);
return secondaryKeyEncoder.get().encode(key);
}

// Checks for complete equality between BucketMetadatas originating from the same BucketedInput
Expand Down Expand Up @@ -478,9 +480,9 @@ static BucketIdFnSupplier create(String hashType) {
}

@FunctionalInterface
interface EncoderSupplier<T> extends Supplier<KeyEncoder<T>>, Serializable {
static <T> EncoderSupplier<T> create(String hashType, Class<T> klass) {
return () -> HashType.valueOf(hashType).encoder(klass);
interface KeyEncoderSupplier<T> extends Supplier<KeyEncoder<T>>, Serializable {
static <T> KeyEncoderSupplier<T> create(String hashType, Coder<T> coder) {
return () -> HashType.valueOf(hashType).keyEncoder(coder);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,39 +81,39 @@ private static byte[] encode(BigDecimal value) {
return value.unscaledValue().toByteArray();
}

public static <T> BucketMetadata.KeyEncoder<T> create(Class<T> klass) {
if (klass.equals(Integer.class)) {
return (value, coder) -> encode((Integer) value);
public static <T> BucketMetadata.KeyEncoder<T> create(Class<? super T> klass) {
if (klass.isAssignableFrom(Integer.class)) {
return value -> encode((Integer) value);
}
if (klass.equals(Long.class)) {
return (value, coder) -> encode((long) value);
if (klass.isAssignableFrom(Long.class)) {
return value -> encode((long) value);
}
if (klass.equals(BigDecimal.class)) {
return (value, coder) -> encode((BigDecimal) value);
if (klass.isAssignableFrom(BigDecimal.class)) {
return value -> encode((BigDecimal) value);
}
if (klass.equals(CharSequence.class) || klass.equals(String.class)) {
return (value, coder) -> encode((String) value);
if (klass.isAssignableFrom(String.class)) {
return value -> encode((String) value);
}
if (klass.equals(UUID.class)) {
return (value, coder) -> encode((UUID) value);
if (klass.isAssignableFrom(UUID.class)) {
return value -> encode((UUID) value);
}
if (klass.equals(byte[].class)) {
return (value, coder) -> (byte[]) value;
if (klass.isAssignableFrom(byte[].class)) {
return value -> (byte[]) value;
}
if (klass.equals(LocalDate.class)) {
return (value, coder) -> encode((LocalDate) value);
if (klass.isAssignableFrom(LocalDate.class)) {
return value -> encode((LocalDate) value);
}
if (klass.equals(LocalTime.class)) {
return (value, coder) -> encode((LocalTime) value);
if (klass.isAssignableFrom(LocalTime.class)) {
return value -> encode((LocalTime) value);
}
if (klass.equals(LocalDateTime.class)) {
return (value, coder) -> encode((LocalDateTime) value);
if (klass.isAssignableFrom(LocalDateTime.class)) {
return value -> encode((LocalDateTime) value);
}
if (klass.equals(ZonedDateTime.class)) {
return (value, coder) -> encode((ZonedDateTime) value);
if (klass.isAssignableFrom(ZonedDateTime.class)) {
return value -> encode((ZonedDateTime) value);
}
if (klass.equals(Instant.class)) {
return (value, coder) -> encode((Instant) value);
if (klass.isAssignableFrom(Instant.class)) {
return value -> encode((Instant) value);
}

throw new UnsupportedOperationException("Unsupported type: " + klass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,11 @@ public void testIcebergHashType() throws Exception {
3, numBuckets, numShards, HashType.ICEBERG, DEFAULT_FILENAME_PREFIX);

HashType hashType = HashType.ICEBERG;
BucketMetadata.KeyEncoder<String> encoder = hashType.encoder(String.class);
BucketMetadata.KeyEncoder<String> encoder = hashType.keyEncoder(StringUtf8Coder.of());
String key = "iceberg";
byte[] expected = encoder.encode(key, null);
byte[] expected = encoder.encode(key);
Assert.assertArrayEquals(expected, metadata.encodeKeyBytesPrimary(key));
byte[] expectedSecondary = encoder.encode("c", null);
byte[] expectedSecondary = encoder.encode("c");
Assert.assertArrayEquals(expectedSecondary, metadata.getKeyBytesSecondary(key));

HashCode hashCode = hashType.create().hashBytes(expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,36 +37,35 @@ public void shouldHashValuesAsDescribedInSpec() {
// https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
BucketMetadata.HashType hashType = BucketMetadata.HashType.ICEBERG;
HashFunction hasher = hashType.create();
BucketMetadata.KeyEncoder<Integer> integerKeyEncoder = hashType.encoder(Integer.class);
BucketMetadata.KeyEncoder<Long> longKeyEncoder = hashType.encoder(Long.class);
BucketMetadata.KeyEncoder<BigDecimal> decimalKeyEncoder = hashType.encoder(BigDecimal.class);
BucketMetadata.KeyEncoder<LocalDate> dateKeyEncoder = hashType.encoder(LocalDate.class);
BucketMetadata.KeyEncoder<LocalTime> timeKeyEncoder = hashType.encoder(LocalTime.class);
BucketMetadata.KeyEncoder<Integer> integerKeyEncoder = IcebergEncoder.create(Integer.class);
BucketMetadata.KeyEncoder<Long> longKeyEncoder = IcebergEncoder.create(Long.class);
BucketMetadata.KeyEncoder<BigDecimal> decimalKeyEncoder =
IcebergEncoder.create(BigDecimal.class);
BucketMetadata.KeyEncoder<LocalDate> dateKeyEncoder = IcebergEncoder.create(LocalDate.class);
BucketMetadata.KeyEncoder<LocalTime> timeKeyEncoder = IcebergEncoder.create(LocalTime.class);
BucketMetadata.KeyEncoder<LocalDateTime> dateTimeKeyEncoder =
hashType.encoder(LocalDateTime.class);
IcebergEncoder.create(LocalDateTime.class);
BucketMetadata.KeyEncoder<ZonedDateTime> zonedDateTimeKeyEncoder =
hashType.encoder(ZonedDateTime.class);
BucketMetadata.KeyEncoder<Instant> instantKeyEncoder = hashType.encoder(Instant.class);
BucketMetadata.KeyEncoder<String> stringKeyEncoder = hashType.encoder(String.class);
BucketMetadata.KeyEncoder<UUID> uuidKeyEncoder = hashType.encoder(UUID.class);
BucketMetadata.KeyEncoder<byte[]> bytesKeyEncoder = hashType.encoder(byte[].class);
IcebergEncoder.create(ZonedDateTime.class);
BucketMetadata.KeyEncoder<Instant> instantKeyEncoder = IcebergEncoder.create(Instant.class);
BucketMetadata.KeyEncoder<String> stringKeyEncoder = IcebergEncoder.create(String.class);
BucketMetadata.KeyEncoder<UUID> uuidKeyEncoder = IcebergEncoder.create(UUID.class);
BucketMetadata.KeyEncoder<byte[]> bytesKeyEncoder = IcebergEncoder.create(byte[].class);

assertEquals(2017239379, hasher.hashBytes(integerKeyEncoder.encode(34, null)).asInt());
assertEquals(2017239379, hasher.hashBytes(longKeyEncoder.encode(34L, null)).asInt());
assertEquals(2017239379, hasher.hashBytes(integerKeyEncoder.encode(34)).asInt());
assertEquals(2017239379, hasher.hashBytes(longKeyEncoder.encode(34L)).asInt());
assertEquals(
-500754589,
hasher.hashBytes(decimalKeyEncoder.encode(new BigDecimal("14.20"), null)).asInt());
-500754589, hasher.hashBytes(decimalKeyEncoder.encode(new BigDecimal("14.20"))).asInt());
assertEquals(
-653330422,
hasher.hashBytes(dateKeyEncoder.encode(LocalDate.of(2017, 11, 16), null)).asInt());
-653330422, hasher.hashBytes(dateKeyEncoder.encode(LocalDate.of(2017, 11, 16))).asInt());
assertEquals(
-662762989, hasher.hashBytes(timeKeyEncoder.encode(LocalTime.of(22, 31, 8), null)).asInt());
-662762989, hasher.hashBytes(timeKeyEncoder.encode(LocalTime.of(22, 31, 8))).asInt());
assertEquals(
-2047944441,
hasher
.hashBytes(
dateTimeKeyEncoder.encode(
LocalDateTime.of(LocalDate.of(2017, 11, 16), LocalTime.of(22, 31, 8)), null))
LocalDateTime.of(LocalDate.of(2017, 11, 16), LocalTime.of(22, 31, 8))))
.asInt());
assertEquals(
-2047944441,
Expand All @@ -76,8 +75,7 @@ public void shouldHashValuesAsDescribedInSpec() {
ZonedDateTime.of(
LocalDate.of(2017, 11, 16),
LocalTime.of(14, 31, 8),
ZoneOffset.ofHours(-8)),
null))
ZoneOffset.ofHours(-8))))
.asInt());
assertEquals(
-2047944441,
Expand All @@ -88,29 +86,26 @@ public void shouldHashValuesAsDescribedInSpec() {
LocalDate.of(2017, 11, 16),
LocalTime.of(14, 31, 8),
ZoneOffset.ofHours(-8))
.toInstant(),
null))
.toInstant()))
.asInt());
assertEquals(1210000089, hasher.hashBytes(stringKeyEncoder.encode("iceberg", null)).asInt());
assertEquals(1210000089, hasher.hashBytes(stringKeyEncoder.encode("iceberg")).asInt());
assertEquals(
1488055340,
hasher
.hashBytes(
uuidKeyEncoder.encode(
UUID.fromString("f79c3e09-677c-4bbd-a479-3f349cb785e7"), null))
uuidKeyEncoder.encode(UUID.fromString("f79c3e09-677c-4bbd-a479-3f349cb785e7")))
.asInt());
assertEquals(
-188683207,
hasher.hashBytes(bytesKeyEncoder.encode(new byte[] {0, 1, 2, 3}, null)).asInt());
-188683207, hasher.hashBytes(bytesKeyEncoder.encode(new byte[] {0, 1, 2, 3})).asInt());
}

@Test
public void shouldThrowOnEncodingUnsupportedTypes() {
// https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements
BucketMetadata.HashType hashType = BucketMetadata.HashType.ICEBERG;

assertThrows(UnsupportedOperationException.class, () -> hashType.encoder(Boolean.class));
assertThrows(UnsupportedOperationException.class, () -> hashType.encoder(Float.class));
assertThrows(UnsupportedOperationException.class, () -> hashType.encoder(Double.class));
assertThrows(UnsupportedOperationException.class, () -> IcebergEncoder.create(Boolean.class));
assertThrows(UnsupportedOperationException.class, () -> IcebergEncoder.create(Float.class));
assertThrows(UnsupportedOperationException.class, () -> IcebergEncoder.create(Double.class));
}
}

0 comments on commit b818c34

Please sign in to comment.