From 1efc744204d3677269be1630f3d542ff3fd433e8 Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Fri, 21 May 2021 23:47:53 +0800 Subject: [PATCH 1/7] Handle VARCHAR and other JDBC specific logical types in AvroUtils. --- .../beam/sdk/schemas/utils/AvroUtils.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 3f4fc8c22285..98e855f0fab6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -906,6 +906,18 @@ private static org.apache.avro.Schema getFieldSchema( .map(x -> getFieldSchema(x.getType(), x.getName(), namespace)) .collect(Collectors.toList())); break; + + case "NVARCHAR": + case "VARCHAR": + case "LONGNVARCHAR": + case "LONGVARCHAR": + baseType = org.apache.avro.Schema.create(Type.STRING); + break; + + case "DATE": + baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); + break; + default: throw new RuntimeException( "Unhandled logical type " + fieldType.getLogicalType().getIdentifier()); @@ -1017,6 +1029,16 @@ private static org.apache.avro.Schema getFieldSchema( typeWithNullability.type.getTypes().get(oneOfValue.getCaseType().getValue()), oneOfValue.getValue()); } + + case "NVARCHAR": + case "VARCHAR": + case "LONGNVARCHAR": + case "LONGVARCHAR": + return new Utf8((String) value); + + case "DATE": + return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays(); + default: throw new RuntimeException( "Unhandled logical type " + fieldType.getLogicalType().getIdentifier()); From e2a50f6ea52623094f5cc8336beb2bba661593c3 Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Sat, 29 May 2021 14:27:40 +0800 Subject: [PATCH 2/7] Varchar and related string types tests --- .../beam/sdk/schemas/utils/AvroUtilsTest.java | 117 ++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index 627d555f6798..b89e645687b5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -26,6 +26,7 @@ import com.pholser.junit.quickcheck.runner.JUnitQuickcheck; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.sql.JDBCType; import java.util.List; import java.util.Map; import org.apache.avro.Conversions; @@ -57,6 +58,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Test; @@ -550,6 +553,80 @@ public void testUnionFieldInBeamSchema() { assertEquals(expectedGenericRecord, AvroUtils.toGenericRecord(row, avroSchema)); } + @Test + public void testJdbcLogicalVarCharRowDataToAvroSchema() { + String expectedAvroSchemaJson = + "{ " + + " \"name\": \"topLevelRecord\", " + + " \"type\": \"record\", " + + " \"fields\": [{ " + + " \"name\": \"my_varchar_field\", " + + " \"type\": \"string\" " + + " }, " + + " { " + + " \"name\": \"my_longvarchar_field\", " + + " \"type\": \"string\" " + + " }, " + + " { " + + " \"name\": \"my_nvarchar_field\", " + + " \"type\": \"string\" " + + " }, " + + " { " + + " \"name\": \"my_longnvarchar_field\", " + + " \"type\": \"string\" " + + " } " + + " ] " + + "}"; + + Schema beamSchema = + Schema.builder() + .addField(Field.of("my_varchar_field", FieldType.logicalType(JdbcStringType.VARCHAR))) + .addField( + Field.of("my_longvarchar_field", FieldType.logicalType(JdbcStringType.LONGVARCHAR))) + .addField(Field.of("my_nvarchar_field", FieldType.logicalType(JdbcStringType.NVARCHAR))) + .addField( + Field.of( + "my_longnvarchar_field", FieldType.logicalType(JdbcStringType.LONGNVARCHAR))) + .build(); + + assertEquals( + new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson), + AvroUtils.toAvroSchema(beamSchema)); + } + + @Test + public void testJdbcLogicalVarCharRowDataToGenericRecord() { + Schema beamSchema = + Schema.builder() + .addField(Field.of("my_varchar_field", FieldType.logicalType(JdbcStringType.VARCHAR))) + .addField( + Field.of("my_longvarchar_field", FieldType.logicalType(JdbcStringType.LONGVARCHAR))) + .addField(Field.of("my_nvarchar_field", FieldType.logicalType(JdbcStringType.NVARCHAR))) + .addField( + Field.of( + "my_longnvarchar_field", FieldType.logicalType(JdbcStringType.LONGNVARCHAR))) + .build(); + + Row rowData = + Row.withSchema(beamSchema) + .addValue("varchar_value") + .addValue("longvarchar_value") + .addValue("nvarchar_value") + .addValue("longnvarchar_value") + .build(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + GenericRecord expectedRecord = + new GenericRecordBuilder(avroSchema) + .set("my_varchar_field", "varchar_value") + .set("my_longvarchar_field", "longvarchar_value") + .set("my_nvarchar_field", "nvarchar_value") + .set("my_longnvarchar_field", "longnvarchar_value") + .build(); + + assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); + } + @Test public void testBeamRowToGenericRecord() { GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null); @@ -640,4 +717,44 @@ public void testNullSchemas() { AvroUtils.getFromRowFunction(GenericRecord.class), AvroUtils.getFromRowFunction(GenericRecord.class)); } + + /** Helper class that simulates a JDBC Logical String types. */ + private static class JdbcStringType implements Schema.LogicalType { + + private static final JdbcStringType VARCHAR = new JdbcStringType(JDBCType.VARCHAR); + private static final JdbcStringType NVARCHAR = new JdbcStringType(JDBCType.NVARCHAR); + private static final JdbcStringType LONGVARCHAR = new JdbcStringType(JDBCType.LONGVARCHAR); + private static final JdbcStringType LONGNVARCHAR = new JdbcStringType(JDBCType.LONGNVARCHAR); + + private final String identifier; + + private JdbcStringType(JDBCType jdbcType) { + this.identifier = jdbcType.getName(); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public @Nullable FieldType getArgumentType() { + return Schema.FieldType.INT32; + } + + @Override + public FieldType getBaseType() { + return Schema.FieldType.STRING; + } + + @Override + public @NonNull String toBaseType(@NonNull String input) { + return input; + } + + @Override + public @NonNull String toInputType(@NonNull String base) { + return base; + } + } } From bf99251eaf2e439ee77ed812a697c8fabe1a1fd5 Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Sat, 29 May 2021 15:58:27 +0800 Subject: [PATCH 3/7] Add tests related to JDBC logical date-time fields. --- .../beam/sdk/schemas/utils/AvroUtils.java | 8 ++ .../beam/sdk/schemas/utils/AvroUtilsTest.java | 119 ++++++++++++++---- 2 files changed, 104 insertions(+), 23 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 98e855f0fab6..15b2d9c94061 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -918,6 +918,11 @@ private static org.apache.avro.Schema getFieldSchema( baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); break; + case "TIME": + baseType = + LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT)); + break; + default: throw new RuntimeException( "Unhandled logical type " + fieldType.getLogicalType().getIdentifier()); @@ -1039,6 +1044,9 @@ private static org.apache.avro.Schema getFieldSchema( case "DATE": return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays(); + case "TIME": + return (int) ((Instant) value).getMillis(); + default: throw new RuntimeException( "Unhandled logical type " + fieldType.getLogicalType().getIdentifier()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index b89e645687b5..5e8581482c8f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -62,6 +62,9 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.Days; +import org.joda.time.Instant; +import org.joda.time.LocalTime; import org.junit.Test; import org.junit.runner.RunWith; @@ -580,13 +583,11 @@ public void testJdbcLogicalVarCharRowDataToAvroSchema() { Schema beamSchema = Schema.builder() - .addField(Field.of("my_varchar_field", FieldType.logicalType(JdbcStringType.VARCHAR))) + .addField(Field.of("my_varchar_field", FieldType.logicalType(JdbcType.VARCHAR))) + .addField(Field.of("my_longvarchar_field", FieldType.logicalType(JdbcType.LONGVARCHAR))) + .addField(Field.of("my_nvarchar_field", FieldType.logicalType(JdbcType.NVARCHAR))) .addField( - Field.of("my_longvarchar_field", FieldType.logicalType(JdbcStringType.LONGVARCHAR))) - .addField(Field.of("my_nvarchar_field", FieldType.logicalType(JdbcStringType.NVARCHAR))) - .addField( - Field.of( - "my_longnvarchar_field", FieldType.logicalType(JdbcStringType.LONGNVARCHAR))) + Field.of("my_longnvarchar_field", FieldType.logicalType(JdbcType.LONGNVARCHAR))) .build(); assertEquals( @@ -598,13 +599,11 @@ public void testJdbcLogicalVarCharRowDataToAvroSchema() { public void testJdbcLogicalVarCharRowDataToGenericRecord() { Schema beamSchema = Schema.builder() - .addField(Field.of("my_varchar_field", FieldType.logicalType(JdbcStringType.VARCHAR))) - .addField( - Field.of("my_longvarchar_field", FieldType.logicalType(JdbcStringType.LONGVARCHAR))) - .addField(Field.of("my_nvarchar_field", FieldType.logicalType(JdbcStringType.NVARCHAR))) + .addField(Field.of("my_varchar_field", FieldType.logicalType(JdbcType.VARCHAR))) + .addField(Field.of("my_longvarchar_field", FieldType.logicalType(JdbcType.LONGVARCHAR))) + .addField(Field.of("my_nvarchar_field", FieldType.logicalType(JdbcType.NVARCHAR))) .addField( - Field.of( - "my_longnvarchar_field", FieldType.logicalType(JdbcStringType.LONGNVARCHAR))) + Field.of("my_longnvarchar_field", FieldType.logicalType(JdbcType.LONGNVARCHAR))) .build(); Row rowData = @@ -627,6 +626,68 @@ public void testJdbcLogicalVarCharRowDataToGenericRecord() { assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); } + @Test + public void testJdbcLogicalDateAndTimeRowDataToAvroSchema() { + String expectedAvroSchemaJson = + "{ " + + " \"name\": \"topLevelRecord\", " + + " \"type\": \"record\", " + + " \"fields\": [{ " + + " \"name\": \"my_date_field\", " + + " \"type\": { \"type\": \"int\", \"logicalType\": \"date\" }" + + " }, " + + " { " + + " \"name\": \"my_time_field\", " + + " \"type\": { \"type\": \"int\", \"logicalType\": \"time-millis\" }" + + " }" + + " ] " + + "}"; + + Schema beamSchema = + Schema.builder() + .addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE))) + .addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME))) + .build(); + + assertEquals( + new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson), + AvroUtils.toAvroSchema(beamSchema)); + } + + @Test + public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() { + // Test Fixed clock at + DateTime testDateTime = DateTime.parse("2021-05-29T11:15:16.234Z"); + + Schema beamSchema = + Schema.builder() + .addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE))) + .addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME))) + .build(); + + Row rowData = + Row.withSchema(beamSchema) + .addValue(testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant()) + .addValue(Instant.ofEpochMilli(testDateTime.toLocalTime().millisOfDay().get())) + .build(); + + int daysFromEpoch = + Days.daysBetween( + Instant.EPOCH, + testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant()) + .getDays(); + int timeSinceMidNight = testDateTime.toLocalTime().getMillisOfDay(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + GenericRecord expectedRecord = + new GenericRecordBuilder(avroSchema) + .set("my_date_field", daysFromEpoch) + .set("my_time_field", timeSinceMidNight) + .build(); + + assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); + } + @Test public void testBeamRowToGenericRecord() { GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null); @@ -718,18 +779,30 @@ public void testNullSchemas() { AvroUtils.getFromRowFunction(GenericRecord.class)); } - /** Helper class that simulates a JDBC Logical String types. */ - private static class JdbcStringType implements Schema.LogicalType { + /** Helper class that simulates a JDBC Logical types. */ + private static class JdbcType implements Schema.LogicalType { - private static final JdbcStringType VARCHAR = new JdbcStringType(JDBCType.VARCHAR); - private static final JdbcStringType NVARCHAR = new JdbcStringType(JDBCType.NVARCHAR); - private static final JdbcStringType LONGVARCHAR = new JdbcStringType(JDBCType.LONGVARCHAR); - private static final JdbcStringType LONGNVARCHAR = new JdbcStringType(JDBCType.LONGNVARCHAR); + private static final JdbcType VARCHAR = + new JdbcType<>(JDBCType.VARCHAR, FieldType.INT32, FieldType.STRING); + private static final JdbcType NVARCHAR = + new JdbcType<>(JDBCType.NVARCHAR, FieldType.INT32, FieldType.STRING); + private static final JdbcType LONGVARCHAR = + new JdbcType<>(JDBCType.LONGVARCHAR, FieldType.INT32, FieldType.STRING); + private static final JdbcType LONGNVARCHAR = + new JdbcType<>(JDBCType.LONGNVARCHAR, FieldType.INT32, FieldType.STRING); + private static final JdbcType DATE = + new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME); + private static final JdbcType TIME = + new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME); private final String identifier; + private final FieldType argumentType; + private final FieldType baseType; - private JdbcStringType(JDBCType jdbcType) { + private JdbcType(JDBCType jdbcType, FieldType argumentType, FieldType baseType) { this.identifier = jdbcType.getName(); + this.argumentType = argumentType; + this.baseType = baseType; } @Override @@ -739,21 +812,21 @@ public String getIdentifier() { @Override public @Nullable FieldType getArgumentType() { - return Schema.FieldType.INT32; + return argumentType; } @Override public FieldType getBaseType() { - return Schema.FieldType.STRING; + return baseType; } @Override - public @NonNull String toBaseType(@NonNull String input) { + public @NonNull T toBaseType(@NonNull T input) { return input; } @Override - public @NonNull String toInputType(@NonNull String base) { + public @NonNull T toInputType(@NonNull T base) { return base; } } From 7758d600b6d48679b891cca75ae3154ab762a471 Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Thu, 3 Jun 2021 19:14:15 +0800 Subject: [PATCH 4/7] Make JDBC Strings handling similar to Hive's approach by mapping to LogicalType preserving the string size characteristics. Add comment in CHANGES.md --- CHANGES.md | 1 + .../beam/sdk/schemas/utils/AvroUtils.java | 16 +++- .../beam/sdk/schemas/utils/AvroUtilsTest.java | 90 ++++++++++++++----- .../beam/sdk/io/jdbc/SchemaUtilTest.java | 52 +++++++++++ 4 files changed, 134 insertions(+), 25 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a64695bdc686..0eb1f6fd1e6e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ ## New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes: `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME` (Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)). ## Breaking Changes diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 15b2d9c94061..547622452514 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -29,6 +29,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.sql.JDBCType; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -911,7 +912,7 @@ private static org.apache.avro.Schema getFieldSchema( case "VARCHAR": case "LONGNVARCHAR": case "LONGVARCHAR": - baseType = org.apache.avro.Schema.create(Type.STRING); + baseType = makeJdbcLogicalStringAvroType(fieldType.getLogicalType()); break; case "DATE": @@ -1307,4 +1308,17 @@ private static void checkTypeName(Schema.TypeName got, Schema.TypeName expected, checkArgument( got.equals(expected), "Can't convert '%s' to %s, expected: %s", label, got, expected); } + + /** Helper factory to build JDBC Logical types for AVRO Schema. */ + private static org.apache.avro.Schema makeJdbcLogicalStringAvroType( + Schema.LogicalType logicalType) { + JDBCType jdbcType = JDBCType.valueOf(logicalType.getIdentifier()); + Integer size = logicalType.getArgument(); + + String schemaJson = + String.format( + "{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\": %s}", + jdbcType.name(), size); + return new org.apache.avro.Schema.Parser().parse(schemaJson); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index 5e8581482c8f..0805168c6f02 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -564,30 +564,39 @@ public void testJdbcLogicalVarCharRowDataToAvroSchema() { + " \"type\": \"record\", " + " \"fields\": [{ " + " \"name\": \"my_varchar_field\", " - + " \"type\": \"string\" " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"VARCHAR\", \"maxLength\": 10}" + " }, " + " { " + " \"name\": \"my_longvarchar_field\", " - + " \"type\": \"string\" " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"LONGVARCHAR\", \"maxLength\": 50}" + " }, " + " { " + " \"name\": \"my_nvarchar_field\", " - + " \"type\": \"string\" " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"NVARCHAR\", \"maxLength\": 10}" + " }, " + " { " + " \"name\": \"my_longnvarchar_field\", " - + " \"type\": \"string\" " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"LONGNVARCHAR\", \"maxLength\": 50}" + " } " + " ] " + "}"; Schema beamSchema = Schema.builder() - .addField(Field.of("my_varchar_field", FieldType.logicalType(JdbcType.VARCHAR))) - .addField(Field.of("my_longvarchar_field", FieldType.logicalType(JdbcType.LONGVARCHAR))) - .addField(Field.of("my_nvarchar_field", FieldType.logicalType(JdbcType.NVARCHAR))) .addField( - Field.of("my_longnvarchar_field", FieldType.logicalType(JdbcType.LONGNVARCHAR))) + Field.of( + "my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10)))) + .addField( + Field.of( + "my_longvarchar_field", + FieldType.logicalType(JdbcType.StringType.longvarchar(50)))) + .addField( + Field.of( + "my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10)))) + .addField( + Field.of( + "my_longnvarchar_field", + FieldType.logicalType(JdbcType.StringType.longnvarchar(50)))) .build(); assertEquals( @@ -599,11 +608,20 @@ public void testJdbcLogicalVarCharRowDataToAvroSchema() { public void testJdbcLogicalVarCharRowDataToGenericRecord() { Schema beamSchema = Schema.builder() - .addField(Field.of("my_varchar_field", FieldType.logicalType(JdbcType.VARCHAR))) - .addField(Field.of("my_longvarchar_field", FieldType.logicalType(JdbcType.LONGVARCHAR))) - .addField(Field.of("my_nvarchar_field", FieldType.logicalType(JdbcType.NVARCHAR))) .addField( - Field.of("my_longnvarchar_field", FieldType.logicalType(JdbcType.LONGNVARCHAR))) + Field.of( + "my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10)))) + .addField( + Field.of( + "my_longvarchar_field", + FieldType.logicalType(JdbcType.StringType.longvarchar(50)))) + .addField( + Field.of( + "my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10)))) + .addField( + Field.of( + "my_longnvarchar_field", + FieldType.logicalType(JdbcType.StringType.longnvarchar(50)))) .build(); Row rowData = @@ -779,30 +797,48 @@ public void testNullSchemas() { AvroUtils.getFromRowFunction(GenericRecord.class)); } - /** Helper class that simulates a JDBC Logical types. */ + /** Helper class that simulate JDBC Logical types. */ private static class JdbcType implements Schema.LogicalType { - private static final JdbcType VARCHAR = - new JdbcType<>(JDBCType.VARCHAR, FieldType.INT32, FieldType.STRING); - private static final JdbcType NVARCHAR = - new JdbcType<>(JDBCType.NVARCHAR, FieldType.INT32, FieldType.STRING); - private static final JdbcType LONGVARCHAR = - new JdbcType<>(JDBCType.LONGVARCHAR, FieldType.INT32, FieldType.STRING); - private static final JdbcType LONGNVARCHAR = - new JdbcType<>(JDBCType.LONGNVARCHAR, FieldType.INT32, FieldType.STRING); private static final JdbcType DATE = - new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME); + new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME, ""); private static final JdbcType TIME = - new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME); + new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME, ""); private final String identifier; private final FieldType argumentType; private final FieldType baseType; + private final Object argument; + + private static class StringType extends JdbcType { + + private static StringType varchar(int size) { + return new StringType(JDBCType.VARCHAR, size); + } + + private static StringType longvarchar(int size) { + return new StringType(JDBCType.LONGVARCHAR, size); + } + + private static StringType nvarchar(int size) { + return new StringType(JDBCType.NVARCHAR, size); + } - private JdbcType(JDBCType jdbcType, FieldType argumentType, FieldType baseType) { + private static StringType longnvarchar(int size) { + return new StringType(JDBCType.LONGNVARCHAR, size); + } + + private StringType(JDBCType type, int size) { + super(type, FieldType.INT32, FieldType.STRING, size); + } + } + + private JdbcType( + JDBCType jdbcType, FieldType argumentType, FieldType baseType, Object argument) { this.identifier = jdbcType.getName(); this.argumentType = argumentType; this.baseType = baseType; + this.argument = argument; } @Override @@ -820,6 +856,12 @@ public FieldType getBaseType() { return baseType; } + @Override + @SuppressWarnings("TypeParameterUnusedInFormals") + public @Nullable T1 getArgument() { + return (T1) argument; + } + @Override public @NonNull T toBaseType(@NonNull T input) { return input; diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java index 7ec9e7bc8e83..07e246635745 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java @@ -37,6 +37,7 @@ import java.sql.Timestamp; import java.sql.Types; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.joda.time.DateTime; @@ -223,6 +224,57 @@ public void testBeamRowMapperPrimitiveTypes() throws Exception { assertEquals(wantRow, haveRow); } + @Test + public void testJdbcLogicalTypesMapValidAvroSchemaIT() { + String expectedAvroSchema = + "{" + + " \"type\": \"record\"," + + " \"name\": \"topLevelRecord\"," + + " \"fields\": [{" + + " \"name\": \"longvarchar_col\"," + + " \"type\": {" + + " \"type\": \"string\"," + + " \"logicalType\": \"LONGVARCHAR\"," + + " \"maxLength\": 50" + + " }" + + " }, {" + + " \"name\": \"varchar_col\"," + + " \"type\": {" + + " \"type\": \"string\"," + + " \"logicalType\": \"VARCHAR\"," + + " \"maxLength\": 15" + + " }" + + " }, {" + + " \"name\": \"date_col\"," + + " \"type\": {" + + " \"type\": \"int\"," + + " \"logicalType\": \"date\"" + + " }" + + " }, {" + + " \"name\": \"time_col\"," + + " \"type\": {" + + " \"type\": \"int\"," + + " \"logicalType\": \"time-millis\"" + + " }" + + " }]" + + "}"; + + Schema jdbcRowSchema = + Schema.builder() + .addField( + "longvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 50)) + .addField("varchar_col", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 15)) + .addField("date_col", LogicalTypes.JDBC_DATE_TYPE) + .addField("time_col", LogicalTypes.JDBC_TIME_TYPE) + .build(); + + System.out.println(AvroUtils.toAvroSchema(jdbcRowSchema)); + + assertEquals( + new org.apache.avro.Schema.Parser().parse(expectedAvroSchema), + AvroUtils.toAvroSchema(jdbcRowSchema)); + } + @Test public void testBeamRowMapperDateTime() throws Exception { long epochMilli = 1558719710000L; From 0b644852db4f872d93049f9bfff7f1f8a736c845 Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Thu, 3 Jun 2021 19:33:06 +0800 Subject: [PATCH 5/7] fix whitespace error in CHANGES.md --- CHANGES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 0eb1f6fd1e6e..f1855a24221b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,7 +64,9 @@ ## New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). -* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes: `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME` (Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)). +* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes: + `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME` + (Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)). ## Breaking Changes From 499eba0c92b977caed135cbccd2f94e8151edf43 Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Thu, 3 Jun 2021 19:38:00 +0800 Subject: [PATCH 6/7] Actually fixing whitespace error in CHANGES.md --- CHANGES.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f1855a24221b..d2b7c9328ddc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,8 +64,8 @@ ## New Features / Improvements * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). -* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes: - `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME` +* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes: + `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME` (Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)). ## Breaking Changes From de939dab8bb572d9d20fbcb40b888276f9a1e42a Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Mon, 7 Jun 2021 10:48:31 +0800 Subject: [PATCH 7/7] Conform to Hive types of SQL char and varchar. --- .../beam/sdk/schemas/utils/AvroUtils.java | 20 ++++++++++++------- .../beam/sdk/schemas/utils/AvroUtilsTest.java | 20 +++++++++++++++---- .../beam/sdk/io/jdbc/SchemaUtilTest.java | 12 +++++++++-- 3 files changed, 39 insertions(+), 13 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 547622452514..5ee0c89f9e43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -29,7 +29,6 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.sql.JDBCType; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -908,11 +907,20 @@ private static org.apache.avro.Schema getFieldSchema( .collect(Collectors.toList())); break; + case "CHAR": + case "NCHAR": + baseType = + makeJdbcLogicalStringAvroType( + /*fixedLength=*/ true, (int) fieldType.getLogicalType().getArgument()); + break; + case "NVARCHAR": case "VARCHAR": case "LONGNVARCHAR": case "LONGVARCHAR": - baseType = makeJdbcLogicalStringAvroType(fieldType.getLogicalType()); + baseType = + makeJdbcLogicalStringAvroType( + /*fixedLength=*/ false, (int) fieldType.getLogicalType().getArgument()); break; case "DATE": @@ -1311,14 +1319,12 @@ private static void checkTypeName(Schema.TypeName got, Schema.TypeName expected, /** Helper factory to build JDBC Logical types for AVRO Schema. */ private static org.apache.avro.Schema makeJdbcLogicalStringAvroType( - Schema.LogicalType logicalType) { - JDBCType jdbcType = JDBCType.valueOf(logicalType.getIdentifier()); - Integer size = logicalType.getArgument(); - + boolean fixedLength, int size) { + String hiveLogicalType = fixedLength ? "char" : "varchar"; String schemaJson = String.format( "{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\": %s}", - jdbcType.name(), size); + hiveLogicalType, size); return new org.apache.avro.Schema.Parser().parse(schemaJson); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index 0805168c6f02..c1096cacdaf3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -564,19 +564,23 @@ public void testJdbcLogicalVarCharRowDataToAvroSchema() { + " \"type\": \"record\", " + " \"fields\": [{ " + " \"name\": \"my_varchar_field\", " - + " \"type\": {\"type\": \"string\", \"logicalType\": \"VARCHAR\", \"maxLength\": 10}" + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}" + " }, " + " { " + " \"name\": \"my_longvarchar_field\", " - + " \"type\": {\"type\": \"string\", \"logicalType\": \"LONGVARCHAR\", \"maxLength\": 50}" + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}" + " }, " + " { " + " \"name\": \"my_nvarchar_field\", " - + " \"type\": {\"type\": \"string\", \"logicalType\": \"NVARCHAR\", \"maxLength\": 10}" + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}" + " }, " + " { " + " \"name\": \"my_longnvarchar_field\", " - + " \"type\": {\"type\": \"string\", \"logicalType\": \"LONGNVARCHAR\", \"maxLength\": 50}" + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}" + + " }, " + + " { " + + " \"name\": \"fixed_length_char_field\", " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"char\", \"maxLength\": 25}" + " } " + " ] " + "}"; @@ -597,6 +601,10 @@ public void testJdbcLogicalVarCharRowDataToAvroSchema() { Field.of( "my_longnvarchar_field", FieldType.logicalType(JdbcType.StringType.longnvarchar(50)))) + .addField( + Field.of( + "fixed_length_char_field", + FieldType.logicalType(JdbcType.StringType.fixedLengthChar(25)))) .build(); assertEquals( @@ -812,6 +820,10 @@ private static class JdbcType implements Schema.LogicalType { private static class StringType extends JdbcType { + private static StringType fixedLengthChar(int size) { + return new StringType(JDBCType.CHAR, size); + } + private static StringType varchar(int size) { return new StringType(JDBCType.VARCHAR, size); } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java index 07e246635745..18acf049ee3c 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java @@ -234,17 +234,24 @@ public void testJdbcLogicalTypesMapValidAvroSchemaIT() { + " \"name\": \"longvarchar_col\"," + " \"type\": {" + " \"type\": \"string\"," - + " \"logicalType\": \"LONGVARCHAR\"," + + " \"logicalType\": \"varchar\"," + " \"maxLength\": 50" + " }" + " }, {" + " \"name\": \"varchar_col\"," + " \"type\": {" + " \"type\": \"string\"," - + " \"logicalType\": \"VARCHAR\"," + + " \"logicalType\": \"varchar\"," + " \"maxLength\": 15" + " }" + " }, {" + + " \"name\": \"fixedlength_char_col\"," + + " \"type\": {" + + " \"type\": \"string\"," + + " \"logicalType\": \"char\"," + + " \"maxLength\": 25" + + " }" + + " }, {" + " \"name\": \"date_col\"," + " \"type\": {" + " \"type\": \"int\"," @@ -264,6 +271,7 @@ public void testJdbcLogicalTypesMapValidAvroSchemaIT() { .addField( "longvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 50)) .addField("varchar_col", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 15)) + .addField("fixedlength_char_col", LogicalTypes.fixedLengthString(JDBCType.CHAR, 25)) .addField("date_col", LogicalTypes.JDBC_DATE_TYPE) .addField("time_col", LogicalTypes.JDBC_TIME_TYPE) .build();