diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java index acf233faf341..255d411028f9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/Schema.java @@ -48,6 +48,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** {@link Schema} describes the fields in {@link Row}. */ @SuppressWarnings({ @@ -681,6 +683,9 @@ public interface LogicalType extends Serializable { @AutoValue @Immutable public abstract static class FieldType implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger(FieldType.class); + // Returns the type of this field. public abstract TypeName getTypeName(); @@ -816,6 +821,13 @@ public static FieldType iterable(FieldType elementType) { /** Create a map type for the given key and value types. */ public static FieldType map(FieldType keyType, FieldType valueType) { + if (FieldType.BYTES.equals(keyType)) { + LOG.warn( + "Using byte arrays as keys in a Map may lead to unexpected behavior and may not work as intended. " + + "Since arrays do not override equals() or hashCode, comparisons will be done on reference equality only. " + + "ByteBuffers, when used as keys, present similar challenges because Row stores ByteBuffer as a byte array. " + + "Consider using a different type of key for more consistent and predictable behavior."); + } return FieldType.forTypeName(TypeName.MAP) .setMapKeyType(keyType) .setMapValueType(valueType) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java index 7bc67526e015..f62a2611a1cf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java @@ -40,7 +40,6 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Assume; -import org.junit.Ignore; import org.junit.Test; /** Unit tests for {@link RowCoder}. */ @@ -285,17 +284,14 @@ public void testConsistentWithEqualsBytesField() throws Exception { } @Test - @Ignore - public void testConsistentWithEqualsMapWithBytesKeyField() throws Exception { + public void testEqualsMapWithBytesKeyFieldWorksOnReferenceEquality() throws Exception { FieldType fieldType = FieldType.map(FieldType.BYTES, FieldType.INT32); Schema schema = Schema.of(Schema.Field.of("f1", fieldType)); RowCoder coder = RowCoder.of(schema); - Map map1 = Collections.singletonMap(new byte[] {1, 2, 3, 4}, 1); - Row row1 = Row.withSchema(schema).addValue(map1).build(); - - Map map2 = Collections.singletonMap(new byte[] {1, 2, 3, 4}, 1); - Row row2 = Row.withSchema(schema).addValue(map2).build(); + Map map = Collections.singletonMap(new byte[] {1, 2, 3, 4}, 1); + Row row1 = Row.withSchema(schema).addValue(map).build(); + Row row2 = Row.withSchema(schema).addValue(map).build(); Assume.assumeTrue(coder.consistentWithEquals());