diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java index 547622452514..5ee0c89f9e43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java @@ -29,7 +29,6 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.sql.JDBCType; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -908,11 +907,20 @@ private static org.apache.avro.Schema getFieldSchema( .collect(Collectors.toList())); break; + case "CHAR": + case "NCHAR": + baseType = + makeJdbcLogicalStringAvroType( + /*fixedLength=*/ true, (int) fieldType.getLogicalType().getArgument()); + break; + case "NVARCHAR": case "VARCHAR": case "LONGNVARCHAR": case "LONGVARCHAR": - baseType = makeJdbcLogicalStringAvroType(fieldType.getLogicalType()); + baseType = + makeJdbcLogicalStringAvroType( + /*fixedLength=*/ false, (int) fieldType.getLogicalType().getArgument()); break; case "DATE": @@ -1311,14 +1319,12 @@ private static void checkTypeName(Schema.TypeName got, Schema.TypeName expected, /** Helper factory to build JDBC Logical types for AVRO Schema. */ private static org.apache.avro.Schema makeJdbcLogicalStringAvroType( - Schema.LogicalType logicalType) { - JDBCType jdbcType = JDBCType.valueOf(logicalType.getIdentifier()); - Integer size = logicalType.getArgument(); - + boolean fixedLength, int size) { + String hiveLogicalType = fixedLength ? "char" : "varchar"; String schemaJson = String.format( "{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\": %s}", - jdbcType.name(), size); + hiveLogicalType, size); return new org.apache.avro.Schema.Parser().parse(schemaJson); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java index 0805168c6f02..c1096cacdaf3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java @@ -564,19 +564,23 @@ public void testJdbcLogicalVarCharRowDataToAvroSchema() { + " \"type\": \"record\", " + " \"fields\": [{ " + " \"name\": \"my_varchar_field\", " - + " \"type\": {\"type\": \"string\", \"logicalType\": \"VARCHAR\", \"maxLength\": 10}" + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}" + " }, " + " { " + " \"name\": \"my_longvarchar_field\", " - + " \"type\": {\"type\": \"string\", \"logicalType\": \"LONGVARCHAR\", \"maxLength\": 50}" + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}" + " }, " + " { " + " \"name\": \"my_nvarchar_field\", " - + " \"type\": {\"type\": \"string\", \"logicalType\": \"NVARCHAR\", \"maxLength\": 10}" + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}" + " }, " + " { " + " \"name\": \"my_longnvarchar_field\", " - + " \"type\": {\"type\": \"string\", \"logicalType\": \"LONGNVARCHAR\", \"maxLength\": 50}" + + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}" + + " }, " + + " { " + + " \"name\": \"fixed_length_char_field\", " + + " \"type\": {\"type\": \"string\", \"logicalType\": \"char\", \"maxLength\": 25}" + " } " + " ] " + "}"; @@ -597,6 +601,10 @@ public void testJdbcLogicalVarCharRowDataToAvroSchema() { Field.of( "my_longnvarchar_field", FieldType.logicalType(JdbcType.StringType.longnvarchar(50)))) + .addField( + Field.of( + "fixed_length_char_field", + FieldType.logicalType(JdbcType.StringType.fixedLengthChar(25)))) .build(); assertEquals( @@ -812,6 +820,10 @@ private static class JdbcType implements Schema.LogicalType { private static class StringType extends JdbcType { + private static StringType fixedLengthChar(int size) { + return new StringType(JDBCType.CHAR, size); + } + private static StringType varchar(int size) { return new StringType(JDBCType.VARCHAR, size); } diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java index 07e246635745..18acf049ee3c 100644 --- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java +++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java @@ -234,17 +234,24 @@ public void testJdbcLogicalTypesMapValidAvroSchemaIT() { + " \"name\": \"longvarchar_col\"," + " \"type\": {" + " \"type\": \"string\"," - + " \"logicalType\": \"LONGVARCHAR\"," + + " \"logicalType\": \"varchar\"," + " \"maxLength\": 50" + " }" + " }, {" + " \"name\": \"varchar_col\"," + " \"type\": {" + " \"type\": \"string\"," - + " \"logicalType\": \"VARCHAR\"," + + " \"logicalType\": \"varchar\"," + " \"maxLength\": 15" + " }" + " }, {" + + " \"name\": \"fixedlength_char_col\"," + + " \"type\": {" + + " \"type\": \"string\"," + + " \"logicalType\": \"char\"," + + " \"maxLength\": 25" + + " }" + + " }, {" + " \"name\": \"date_col\"," + " \"type\": {" + " \"type\": \"int\"," @@ -264,6 +271,7 @@ public void testJdbcLogicalTypesMapValidAvroSchemaIT() { .addField( "longvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 50)) .addField("varchar_col", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 15)) + .addField("fixedlength_char_col", LogicalTypes.fixedLengthString(JDBCType.CHAR, 25)) .addField("date_col", LogicalTypes.JDBC_DATE_TYPE) .addField("time_col", LogicalTypes.JDBC_TIME_TYPE) .build();