diff --git a/CHANGES.md b/CHANGES.md
index 7029cd2eed17..62b8e7105076 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,9 @@
* `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and `AGGREGATE` are now reserved keywords. ([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)).
* Flink 1.13 is now supported by the Flink runner ([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)).
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes:
+ `VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME`
+ (Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)).
## Breaking Changes
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
index 77b5445b20bb..0835f9b84450 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
@@ -906,6 +906,26 @@ private static org.apache.avro.Schema getFieldSchema(
.map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
.collect(Collectors.toList()));
break;
+ case "CHAR":
+ case "NCHAR":
+ baseType =
+ buildHiveLogicalTypeSchema("char", (int) fieldType.getLogicalType().getArgument());
+ break;
+ case "NVARCHAR":
+ case "VARCHAR":
+ case "LONGNVARCHAR":
+ case "LONGVARCHAR":
+ baseType =
+ buildHiveLogicalTypeSchema(
+ "varchar", (int) fieldType.getLogicalType().getArgument());
+ break;
+ case "DATE":
+ baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
+ break;
+ case "TIME":
+ baseType =
+ LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
+ break;
default:
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
@@ -1017,6 +1037,15 @@ private static org.apache.avro.Schema getFieldSchema(
typeWithNullability.type.getTypes().get(oneOfValue.getCaseType().getValue()),
oneOfValue.getValue());
}
+ case "NVARCHAR":
+ case "VARCHAR":
+ case "LONGNVARCHAR":
+ case "LONGVARCHAR":
+ return new Utf8((String) value);
+ case "DATE":
+ return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
+ case "TIME":
+ return (int) ((Instant) value).getMillis();
default:
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
@@ -1277,4 +1306,18 @@ private static void checkTypeName(Schema.TypeName got, Schema.TypeName expected,
checkArgument(
got.equals(expected), "Can't convert '%s' to %s, expected: %s", label, got, expected);
}
+
+ /**
+ * Helper factory to build Avro Logical types schemas for SQL *CHAR types. This method represents
+ * the logical as Hive does.
+ */
+ private static org.apache.avro.Schema buildHiveLogicalTypeSchema(
+ String hiveLogicalType, int size) {
+ String schemaJson =
+ String.format(
+ "{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\": %s}",
+ hiveLogicalType, size);
+ return new org.apache.avro.Schema.Parser().parse(schemaJson);
+ }
}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
index 627d555f6798..c1096cacdaf3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
@@ -26,6 +26,7 @@
import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import org.apache.avro.Conversions;
@@ -57,8 +58,13 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+import org.joda.time.Instant;
+import org.joda.time.LocalTime;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -550,6 +556,164 @@ public void testUnionFieldInBeamSchema() {
assertEquals(expectedGenericRecord, AvroUtils.toGenericRecord(row, avroSchema));
}
+ @Test
+ public void testJdbcLogicalVarCharRowDataToAvroSchema() {
+ String expectedAvroSchemaJson =
+ "{ "
+ + " \"name\": \"topLevelRecord\", "
+ + " \"type\": \"record\", "
+ + " \"fields\": [{ "
+ + " \"name\": \"my_varchar_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}"
+ + " }, "
+ + " { "
+ + " \"name\": \"my_longvarchar_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}"
+ + " }, "
+ + " { "
+ + " \"name\": \"my_nvarchar_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}"
+ + " }, "
+ + " { "
+ + " \"name\": \"my_longnvarchar_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}"
+ + " }, "
+ + " { "
+ + " \"name\": \"fixed_length_char_field\", "
+ + " \"type\": {\"type\": \"string\", \"logicalType\": \"char\", \"maxLength\": 25}"
+ + " } "
+ + " ] "
+ + "}";
+
+ Schema beamSchema =
+ Schema.builder()
+ .addField(
+ Field.of(
+ "my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10))))
+ .addField(
+ Field.of(
+ "my_longvarchar_field",
+ FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
+ .addField(
+ Field.of(
+ "my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
+ .addField(
+ Field.of(
+ "my_longnvarchar_field",
+ FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
+ .addField(
+ Field.of(
+ "fixed_length_char_field",
+ FieldType.logicalType(JdbcType.StringType.fixedLengthChar(25))))
+ .build();
+
+ assertEquals(
+ new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
+ AvroUtils.toAvroSchema(beamSchema));
+ }
+
+ @Test
+ public void testJdbcLogicalVarCharRowDataToGenericRecord() {
+ Schema beamSchema =
+ Schema.builder()
+ .addField(
+ Field.of(
+ "my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10))))
+ .addField(
+ Field.of(
+ "my_longvarchar_field",
+ FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
+ .addField(
+ Field.of(
+ "my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
+ .addField(
+ Field.of(
+ "my_longnvarchar_field",
+ FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
+ .build();
+
+ Row rowData =
+ Row.withSchema(beamSchema)
+ .addValue("varchar_value")
+ .addValue("longvarchar_value")
+ .addValue("nvarchar_value")
+ .addValue("longnvarchar_value")
+ .build();
+
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+ GenericRecord expectedRecord =
+ new GenericRecordBuilder(avroSchema)
+ .set("my_varchar_field", "varchar_value")
+ .set("my_longvarchar_field", "longvarchar_value")
+ .set("my_nvarchar_field", "nvarchar_value")
+ .set("my_longnvarchar_field", "longnvarchar_value")
+ .build();
+
+ assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
+ }
+
+ @Test
+ public void testJdbcLogicalDateAndTimeRowDataToAvroSchema() {
+ String expectedAvroSchemaJson =
+ "{ "
+ + " \"name\": \"topLevelRecord\", "
+ + " \"type\": \"record\", "
+ + " \"fields\": [{ "
+ + " \"name\": \"my_date_field\", "
+ + " \"type\": { \"type\": \"int\", \"logicalType\": \"date\" }"
+ + " }, "
+ + " { "
+ + " \"name\": \"my_time_field\", "
+ + " \"type\": { \"type\": \"int\", \"logicalType\": \"time-millis\" }"
+ + " }"
+ + " ] "
+ + "}";
+
+ Schema beamSchema =
+ Schema.builder()
+ .addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE)))
+ .addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME)))
+ .build();
+
+ assertEquals(
+ new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
+ AvroUtils.toAvroSchema(beamSchema));
+ }
+
+ @Test
+ public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() {
+ // Test Fixed clock at
+ DateTime testDateTime = DateTime.parse("2021-05-29T11:15:16.234Z");
+
+ Schema beamSchema =
+ Schema.builder()
+ .addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE)))
+ .addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME)))
+ .build();
+
+ Row rowData =
+ Row.withSchema(beamSchema)
+ .addValue(testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
+ .addValue(Instant.ofEpochMilli(testDateTime.toLocalTime().millisOfDay().get()))
+ .build();
+
+ int daysFromEpoch =
+ Days.daysBetween(
+ Instant.EPOCH,
+ testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
+ .getDays();
+ int timeSinceMidNight = testDateTime.toLocalTime().getMillisOfDay();
+
+ org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
+ GenericRecord expectedRecord =
+ new GenericRecordBuilder(avroSchema)
+ .set("my_date_field", daysFromEpoch)
+ .set("my_time_field", timeSinceMidNight)
+ .build();
+
+ assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
+ }
+
@Test
public void testBeamRowToGenericRecord() {
GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);
@@ -640,4 +804,84 @@ public void testNullSchemas() {
AvroUtils.getFromRowFunction(GenericRecord.class),
AvroUtils.getFromRowFunction(GenericRecord.class));
}
+
+ /** Helper class that simulate JDBC Logical types. */
+ private static class JdbcType implements Schema.LogicalType {
+
+ private static final JdbcType DATE =
+ new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME, "");
+ private static final JdbcType TIME =
+ new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME, "");
+
+ private final String identifier;
+ private final FieldType argumentType;
+ private final FieldType baseType;
+ private final Object argument;
+
+ private static class StringType extends JdbcType {
+
+ private static StringType fixedLengthChar(int size) {
+ return new StringType(JDBCType.CHAR, size);
+ }
+
+ private static StringType varchar(int size) {
+ return new StringType(JDBCType.VARCHAR, size);
+ }
+
+ private static StringType longvarchar(int size) {
+ return new StringType(JDBCType.LONGVARCHAR, size);
+ }
+
+ private static StringType nvarchar(int size) {
+ return new StringType(JDBCType.NVARCHAR, size);
+ }
+
+ private static StringType longnvarchar(int size) {
+ return new StringType(JDBCType.LONGNVARCHAR, size);
+ }
+
+ private StringType(JDBCType type, int size) {
+ super(type, FieldType.INT32, FieldType.STRING, size);
+ }
+ }
+
+ private JdbcType(
+ JDBCType jdbcType, FieldType argumentType, FieldType baseType, Object argument) {
+ this.identifier = jdbcType.getName();
+ this.argumentType = argumentType;
+ this.baseType = baseType;
+ this.argument = argument;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public @Nullable FieldType getArgumentType() {
+ return argumentType;
+ }
+
+ @Override
+ public FieldType getBaseType() {
+ return baseType;
+ }
+
+ @Override
+ @SuppressWarnings("TypeParameterUnusedInFormals")
+ public @Nullable T1 getArgument() {
+ return (T1) argument;
+ }
+
+ @Override
+ public @NonNull T toBaseType(@NonNull T input) {
+ return input;
+ }
+
+ @Override
+ public @NonNull T toInputType(@NonNull T base) {
+ return base;
+ }
+ }
}
diff --git a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
index 7ec9e7bc8e83..18acf049ee3c 100644
--- a/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
+++ b/sdks/java/io/jdbc/src/test/java/org/apache/beam/sdk/io/jdbc/SchemaUtilTest.java
@@ -37,6 +37,7 @@
import java.sql.Timestamp;
import java.sql.Types;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
@@ -223,6 +224,65 @@ public void testBeamRowMapperPrimitiveTypes() throws Exception {
assertEquals(wantRow, haveRow);
}
+ @Test
+ public void testJdbcLogicalTypesMapValidAvroSchemaIT() {
+ String expectedAvroSchema =
+ "{"
+ + " \"type\": \"record\","
+ + " \"name\": \"topLevelRecord\","
+ + " \"fields\": [{"
+ + " \"name\": \"longvarchar_col\","
+ + " \"type\": {"
+ + " \"type\": \"string\","
+ + " \"logicalType\": \"varchar\","
+ + " \"maxLength\": 50"
+ + " }"
+ + " }, {"
+ + " \"name\": \"varchar_col\","
+ + " \"type\": {"
+ + " \"type\": \"string\","
+ + " \"logicalType\": \"varchar\","
+ + " \"maxLength\": 15"
+ + " }"
+ + " }, {"
+ + " \"name\": \"fixedlength_char_col\","
+ + " \"type\": {"
+ + " \"type\": \"string\","
+ + " \"logicalType\": \"char\","
+ + " \"maxLength\": 25"
+ + " }"
+ + " }, {"
+ + " \"name\": \"date_col\","
+ + " \"type\": {"
+ + " \"type\": \"int\","
+ + " \"logicalType\": \"date\""
+ + " }"
+ + " }, {"
+ + " \"name\": \"time_col\","
+ + " \"type\": {"
+ + " \"type\": \"int\","
+ + " \"logicalType\": \"time-millis\""
+ + " }"
+ + " }]"
+ + "}";
+
+ Schema jdbcRowSchema =
+ Schema.builder()
+ .addField(
+ "longvarchar_col", LogicalTypes.variableLengthString(JDBCType.LONGVARCHAR, 50))
+ .addField("varchar_col", LogicalTypes.variableLengthString(JDBCType.VARCHAR, 15))
+ .addField("fixedlength_char_col", LogicalTypes.fixedLengthString(JDBCType.CHAR, 25))
+ .addField("date_col", LogicalTypes.JDBC_DATE_TYPE)
+ .addField("time_col", LogicalTypes.JDBC_TIME_TYPE)
+ .build();
+
+ System.out.println(AvroUtils.toAvroSchema(jdbcRowSchema));
+
+ assertEquals(
+ new org.apache.avro.Schema.Parser().parse(expectedAvroSchema),
+ AvroUtils.toAvroSchema(jdbcRowSchema));
+ }
+
@Test
public void testBeamRowMapperDateTime() throws Exception {
long epochMilli = 1558719710000L;