From 88d4712147911744761cb385b8226c81e283d1fe Mon Sep 17 00:00:00 2001 From: Anant Damle Date: Fri, 21 May 2021 23:47:53 +0800 Subject: [PATCH] [BEAM-12385] Handle VARCHAR and other SQL specific logical types in AvroUtils --- CHANGES.md | 3 + .../beam/sdk/schemas/utils/AvroUtils.java | 43 +++ .../beam/sdk/schemas/utils/AvroUtilsTest.java | 244 ++++++++++++++++++ .../beam/sdk/io/jdbc/SchemaUtilTest.java | 60 +++++ 4 files changed, 350 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 7029cd2eed17..62b8e7105076 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -66,6 +66,9 @@ * `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and `AGGREGATE` are now reserved keywords. ([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)). * Flink 1.13 is now supported by the Flink runner ([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)). * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). +* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes: + `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME` + (Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)). ## Breaking Changes diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 77b5445b20bb..0835f9b84450 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -906,6 +906,26 @@ private static org.apache.avro.Schema getFieldSchema( .map(x -> getFieldSchema(x.getType(), x.getName(), namespace)) .collect(Collectors.toList())); break; + case "CHAR": + case "NCHAR": + baseType = + buildHiveLogicalTypeSchema("char", (int) fieldType.getLogicalType().getArgument()); + break; + case "NVARCHAR": + case "VARCHAR": + case "LONGNVARCHAR": + case "LONGVARCHAR": + baseType = + buildHiveLogicalTypeSchema( + "varchar", (int) fieldType.getLogicalType().getArgument()); + break; + case "DATE": + baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT)); + break; + case "TIME": + baseType = + LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT)); + break; default: throw new RuntimeException( "Unhandled logical type " + fieldType.getLogicalType().getIdentifier()); @@ -1017,6 +1037,15 @@ private static org.apache.avro.Schema getFieldSchema( typeWithNullability.type.getTypes().get(oneOfValue.getCaseType().getValue()), oneOfValue.getValue()); } + case "NVARCHAR": + case "VARCHAR": + case "LONGNVARCHAR": + case "LONGVARCHAR": + return new Utf8((String) value); + case "DATE": + return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays(); + case "TIME": + return (int) ((Instant) value).getMillis(); default: throw new RuntimeException( "Unhandled logical type " + fieldType.getLogicalType().getIdentifier()); @@ -1277,4 +1306,18 @@ private static void checkTypeName(Schema.TypeName got, Schema.TypeName expected, checkArgument( got.equals(expected), "Can't convert '%s' to %s, expected: %s", label, got, expected); } + + /** + * Helper factory to build Avro Logical types schemas for SQL *CHAR types. This method represents + * the logical as Hive does. + */ + private static org.apache.avro.Schema buildHiveLogicalTypeSchema( + String hiveLogicalType, int size) { + String schemaJson = + String.format( + "{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\": %s}", + hiveLogicalType, size); + return new org.apache.avro.Schema.Parser().parse(schemaJson); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index 627d555f6798..c1096cacdaf3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -26,6 +26,7 @@ import com.pholser.junit.quickcheck.runner.JUnitQuickcheck; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.sql.JDBCType; import java.util.List; import java.util.Map; import org.apache.avro.Conversions; @@ -57,8 +58,13 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; +import org.joda.time.Days; +import org.joda.time.Instant; +import org.joda.time.LocalTime; import org.junit.Test; import org.junit.runner.RunWith; @@ -550,6 +556,164 @@ public void testUnionFieldInBeamSchema() { assertEquals(expectedGenericRecord, AvroUtils.toGenericRecord(row, avroSchema)); } + @Test + public void testJdbcLogicalVarCharRowDataToAvroSchema() { + String expectedAvroSchemaJson = + "{ " + + " \"name\": \"topLevelRecord\", " + + " \"type\": \"record\", " + + " \"fields\": [{ " + + " \"name\": \"my_varchar_field\", " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}" + + " }, " + + " { " + + " \"name\": \"my_longvarchar_field\", " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}" + + " }, " + + " { " + + " \"name\": \"my_nvarchar_field\", " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}" + + " }, " + + " { " + + " \"name\": \"my_longnvarchar_field\", " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}" + + " }, " + + " { " + + " \"name\": \"fixed_length_char_field\", " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"char\", \"maxLength\": 25}" + + " } " + + " ] " + + "}"; + + Schema beamSchema = + Schema.builder() + .addField( + Field.of( + "my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10)))) + .addField( + Field.of( + "my_longvarchar_field", + FieldType.logicalType(JdbcType.StringType.longvarchar(50)))) + .addField( + Field.of( + "my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10)))) + .addField( + Field.of( + "my_longnvarchar_field", + FieldType.logicalType(JdbcType.StringType.longnvarchar(50)))) + .addField( + Field.of( + "fixed_length_char_field", + FieldType.logicalType(JdbcType.StringType.fixedLengthChar(25)))) + .build(); + + assertEquals( + new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson), + AvroUtils.toAvroSchema(beamSchema)); + } + + @Test + public void testJdbcLogicalVarCharRowDataToGenericRecord() { + Schema beamSchema = + Schema.builder() + .addField( + Field.of( + "my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10)))) + .addField( + Field.of( + "my_longvarchar_field", + FieldType.logicalType(JdbcType.StringType.longvarchar(50)))) + .addField( + Field.of( + "my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10)))) + .addField( + Field.of( + "my_longnvarchar_field", + FieldType.logicalType(JdbcType.StringType.longnvarchar(50)))) + .build(); + + Row rowData = + Row.withSchema(beamSchema) + .addValue("varchar_value") + .addValue("longvarchar_value") + .addValue("nvarchar_value") + .addValue("longnvarchar_value") + .build(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + GenericRecord expectedRecord = + new GenericRecordBuilder(avroSchema) + .set("my_varchar_field", "varchar_value") + .set("my_longvarchar_field", "longvarchar_value") + .set("my_nvarchar_field", "nvarchar_value") + .set("my_longnvarchar_field", "longnvarchar_value") + .build(); + + assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); + } + + @Test + public void testJdbcLogicalDateAndTimeRowDataToAvroSchema() { + String expectedAvroSchemaJson = + "{ " + + " \"name\": \"topLevelRecord\", " + + " \"type\": \"record\", " + + " \"fields\": [{ " + + " \"name\": \"my_date_field\", " + + " \"type\": { \"type\": \"int\", \"logicalType\": \"date\" }" + + " }, " + + " { " + + " \"name\": \"my_time_field\", " + + " \"type\": { \"type\": \"int\", \"logicalType\": \"time-millis\" }" + + " }" + + " ] " + + "}"; + + Schema beamSchema = + Schema.builder() + .addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE))) + .addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME))) + .build(); + + assertEquals( + new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson), + AvroUtils.toAvroSchema(beamSchema)); + } + + @Test + public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() { + // Test Fixed clock at + DateTime testDateTime = DateTime.parse("2021-05-29T11:15:16.234Z"); + + Schema beamSchema = + Schema.builder() + .addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE))) + .addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME))) + .build(); + + Row rowData = + Row.withSchema(beamSchema) + .addValue(testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant()) + .addValue(Instant.ofEpochMilli(testDateTime.toLocalTime().millisOfDay().get())) + .build(); + + int daysFromEpoch = + Days.daysBetween( + Instant.EPOCH, + testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant()) + .getDays(); + int timeSinceMidNight = testDateTime.toLocalTime().getMillisOfDay(); + + org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema); + GenericRecord expectedRecord = + new GenericRecordBuilder(avroSchema) + .set("my_date_field", daysFromEpoch) + .set("my_time_field", timeSinceMidNight) + .build(); + + assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema)); + } + @Test public void testBeamRowToGenericRecord() { GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null); @@ -640,4 +804,84 @@ public void testNullSchemas() { AvroUtils.getFromRowFunction(GenericRecord.class), AvroUtils.getFromRowFunction(GenericRecord.class)); } + + /** Helper class that simulate JDBC Logical types. */ + private static class JdbcType implements Schema.LogicalType { + + private static final JdbcType DATE = + new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME, ""); + private static final JdbcType TIME = + new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME, ""); + + private final String identifier; + private final FieldType argumentType; + private final FieldType baseType; + private final Object argument; + + private static class StringType extends JdbcType { + + private static StringType fixedLengthChar(int size) { + return new StringType(JDBCType.CHAR, size); + } + + private static StringType varchar(int size) { + return new StringType(JDBCType.VARCHAR, size); + } + + private static StringType longvarchar(int size) { + return new StringType(JDBCType.LONGVARCHAR, size); + } + + private static StringType nvarchar(int size) { + return new StringType(JDBCType.NVARCHAR, size); + } + + private static StringType longnvarchar(int size) { + return new StringType(JDBCType.LONGNVARCHAR, size); + } + + private StringType(JDBCType type, int size) { + super(type, FieldType.INT32, FieldType.STRING, size); + } + } + + private JdbcType( + JDBCType jdbcType, FieldType argumentType, FieldType baseType, Object argument) { + this.identifier = jdbcType.getName(); + this.argumentType = argumentType; + this.baseType = baseType; + this.argument = argument; + } + + @Override + public String getIdentifier() { + return identifier; + } + + @Override + public @Nullable FieldType getArgumentType() { + return argumentType; + } + + @Override + public FieldType getBaseType() { + return baseType; + } + + @Override + @SuppressWarnings("TypeParameterUnusedInFormals") + public @Nullable T1 getArgument() { + return (T1) argument; + } + + @Override + public @NonNull T toBaseType(@NonNull T input) { + return input; + } + + @Override + public @NonNull T toInputType(@NonNull T base) { + return base; + } + } } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java index 7ec9e7bc8e83..18acf049ee3c 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java @@ -37,6 +37,7 @@ import java.sql.Timestamp; import java.sql.Types; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.AvroUtils; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.joda.time.DateTime; @@ -223,6 +224,65 @@ public void testBeamRowMapperPrimitiveTypes() throws Exception { assertEquals(wantRow, haveRow); } + @Test + public void testJdbcLogicalTypesMapValidAvroSchemaIT() { + String expectedAvroSchema = + "{" + + " \"type\": \"record\"," + + " \"name\": \"topLevelRecord\"," + + " \"fields\": [{" + + " \"name\": \"longvarchar_col\"," + + " \"type\": {" + + " \"type\": \"string\"," + + " \"logicalType\": \"varchar\"," + + " \"maxLength\": 50" + + " }" + + " }, {" + + " \"name\": \"varchar_col\"," + + " \"type\": {" + + " \"type\": \"string\"," + + " \"logicalType\": \"varchar\"," + + " \"maxLength\": 15" + + " }" + + " }, {" + + " \"name\": \"fixedlength_char_col\"," + + " \"type\": {" + + " \"type\": \"string\"," + + " \"logicalType\": \"char\"," + + " \"maxLength\": 25" + + " }" + + " }, {" + + " \"name\": \"date_col\"," + + " \"type\": {" + + " \"type\": \"int\"," + + " \"logicalType\": \"date\"" + + " }" + + " }, {" + + " \"name\": \"time_col\"," + + " \"type\": {" + + " \"type\": \"int\"," + + " \"logicalType\": \"time-millis\"" + + " }" + + " }]" + + "}"; + + Schema jdbcRowSchema = + Schema.builder() + .addField( + "longvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 50)) + .addField("varchar_col", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 15)) + .addField("fixedlength_char_col", LogicalTypes.fixedLengthString(JDBCType.CHAR, 25)) + .addField("date_col", LogicalTypes.JDBC_DATE_TYPE) + .addField("time_col", LogicalTypes.JDBC_TIME_TYPE) + .build(); + + System.out.println(AvroUtils.toAvroSchema(jdbcRowSchema)); + + assertEquals( + new org.apache.avro.Schema.Parser().parse(expectedAvroSchema), + AvroUtils.toAvroSchema(jdbcRowSchema)); + } + @Test public void testBeamRowMapperDateTime() throws Exception { long epochMilli = 1558719710000L;