Skip to content

Commit

Permalink
[BEAM-12385] Handle VARCHAR and other SQL specific logical types in A…
Browse files Browse the repository at this point in the history
…vroUtils
  • Loading branch information
Anant Damle authored and iemejia committed Jun 7, 2021
1 parent 9898493 commit 88d4712
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@
* `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and `AGGREGATE` are now reserved keywords. ([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)).
* Flink 1.13 is now supported by the Flink runner ([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)).
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes:
`VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME`
(Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,26 @@ private static org.apache.avro.Schema getFieldSchema(
.map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
.collect(Collectors.toList()));
break;
case "CHAR":
case "NCHAR":
baseType =
buildHiveLogicalTypeSchema("char", (int) fieldType.getLogicalType().getArgument());
break;
case "NVARCHAR":
case "VARCHAR":
case "LONGNVARCHAR":
case "LONGVARCHAR":
baseType =
buildHiveLogicalTypeSchema(
"varchar", (int) fieldType.getLogicalType().getArgument());
break;
case "DATE":
baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
break;
case "TIME":
baseType =
LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
break;
default:
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
Expand Down Expand Up @@ -1017,6 +1037,15 @@ private static org.apache.avro.Schema getFieldSchema(
typeWithNullability.type.getTypes().get(oneOfValue.getCaseType().getValue()),
oneOfValue.getValue());
}
case "NVARCHAR":
case "VARCHAR":
case "LONGNVARCHAR":
case "LONGVARCHAR":
return new Utf8((String) value);
case "DATE":
return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();
case "TIME":
return (int) ((Instant) value).getMillis();
default:
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
Expand Down Expand Up @@ -1277,4 +1306,18 @@ private static void checkTypeName(Schema.TypeName got, Schema.TypeName expected,
checkArgument(
got.equals(expected), "Can't convert '%s' to %s, expected: %s", label, got, expected);
}

/**
* Helper factory to build Avro Logical types schemas for SQL *CHAR types. This method <a
* href="https://github.com/apache/hive/blob/5d268834a5f5278ea76399f8af0d0ab043ae0b45/serde/src/java/org/apache/hadoop/hive/serde2/avro/TypeInfoToSchema.java#L110-L121">represents
* the logical as Hive does</a>.
*/
private static org.apache.avro.Schema buildHiveLogicalTypeSchema(
String hiveLogicalType, int size) {
String schemaJson =
String.format(
"{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\": %s}",
hiveLogicalType, size);
return new org.apache.avro.Schema.Parser().parse(schemaJson);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import org.apache.avro.Conversions;
Expand Down Expand Up @@ -57,8 +58,13 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.Instant;
import org.joda.time.LocalTime;
import org.junit.Test;
import org.junit.runner.RunWith;

Expand Down Expand Up @@ -550,6 +556,164 @@ public void testUnionFieldInBeamSchema() {
assertEquals(expectedGenericRecord, AvroUtils.toGenericRecord(row, avroSchema));
}

@Test
public void testJdbcLogicalVarCharRowDataToAvroSchema() {
String expectedAvroSchemaJson =
"{ "
+ " \"name\": \"topLevelRecord\", "
+ " \"type\": \"record\", "
+ " \"fields\": [{ "
+ " \"name\": \"my_varchar_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}"
+ " }, "
+ " { "
+ " \"name\": \"my_longvarchar_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}"
+ " }, "
+ " { "
+ " \"name\": \"my_nvarchar_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 10}"
+ " }, "
+ " { "
+ " \"name\": \"my_longnvarchar_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"varchar\", \"maxLength\": 50}"
+ " }, "
+ " { "
+ " \"name\": \"fixed_length_char_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"char\", \"maxLength\": 25}"
+ " } "
+ " ] "
+ "}";

Schema beamSchema =
Schema.builder()
.addField(
Field.of(
"my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10))))
.addField(
Field.of(
"my_longvarchar_field",
FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
.addField(
Field.of(
"my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
.addField(
Field.of(
"my_longnvarchar_field",
FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
.addField(
Field.of(
"fixed_length_char_field",
FieldType.logicalType(JdbcType.StringType.fixedLengthChar(25))))
.build();

assertEquals(
new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
AvroUtils.toAvroSchema(beamSchema));
}

@Test
public void testJdbcLogicalVarCharRowDataToGenericRecord() {
Schema beamSchema =
Schema.builder()
.addField(
Field.of(
"my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10))))
.addField(
Field.of(
"my_longvarchar_field",
FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
.addField(
Field.of(
"my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
.addField(
Field.of(
"my_longnvarchar_field",
FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
.build();

Row rowData =
Row.withSchema(beamSchema)
.addValue("varchar_value")
.addValue("longvarchar_value")
.addValue("nvarchar_value")
.addValue("longnvarchar_value")
.build();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord expectedRecord =
new GenericRecordBuilder(avroSchema)
.set("my_varchar_field", "varchar_value")
.set("my_longvarchar_field", "longvarchar_value")
.set("my_nvarchar_field", "nvarchar_value")
.set("my_longnvarchar_field", "longnvarchar_value")
.build();

assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testJdbcLogicalDateAndTimeRowDataToAvroSchema() {
String expectedAvroSchemaJson =
"{ "
+ " \"name\": \"topLevelRecord\", "
+ " \"type\": \"record\", "
+ " \"fields\": [{ "
+ " \"name\": \"my_date_field\", "
+ " \"type\": { \"type\": \"int\", \"logicalType\": \"date\" }"
+ " }, "
+ " { "
+ " \"name\": \"my_time_field\", "
+ " \"type\": { \"type\": \"int\", \"logicalType\": \"time-millis\" }"
+ " }"
+ " ] "
+ "}";

Schema beamSchema =
Schema.builder()
.addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE)))
.addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME)))
.build();

assertEquals(
new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
AvroUtils.toAvroSchema(beamSchema));
}

@Test
public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() {
// Test Fixed clock at
DateTime testDateTime = DateTime.parse("2021-05-29T11:15:16.234Z");

Schema beamSchema =
Schema.builder()
.addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE)))
.addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME)))
.build();

Row rowData =
Row.withSchema(beamSchema)
.addValue(testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
.addValue(Instant.ofEpochMilli(testDateTime.toLocalTime().millisOfDay().get()))
.build();

int daysFromEpoch =
Days.daysBetween(
Instant.EPOCH,
testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
.getDays();
int timeSinceMidNight = testDateTime.toLocalTime().getMillisOfDay();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord expectedRecord =
new GenericRecordBuilder(avroSchema)
.set("my_date_field", daysFromEpoch)
.set("my_time_field", timeSinceMidNight)
.build();

assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testBeamRowToGenericRecord() {
GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);
Expand Down Expand Up @@ -640,4 +804,84 @@ public void testNullSchemas() {
AvroUtils.getFromRowFunction(GenericRecord.class),
AvroUtils.getFromRowFunction(GenericRecord.class));
}

/** Helper class that simulate JDBC Logical types. */
private static class JdbcType<T> implements Schema.LogicalType<T, T> {

private static final JdbcType<Instant> DATE =
new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME, "");
private static final JdbcType<Instant> TIME =
new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME, "");

private final String identifier;
private final FieldType argumentType;
private final FieldType baseType;
private final Object argument;

private static class StringType extends JdbcType<String> {

private static StringType fixedLengthChar(int size) {
return new StringType(JDBCType.CHAR, size);
}

private static StringType varchar(int size) {
return new StringType(JDBCType.VARCHAR, size);
}

private static StringType longvarchar(int size) {
return new StringType(JDBCType.LONGVARCHAR, size);
}

private static StringType nvarchar(int size) {
return new StringType(JDBCType.NVARCHAR, size);
}

private static StringType longnvarchar(int size) {
return new StringType(JDBCType.LONGNVARCHAR, size);
}

private StringType(JDBCType type, int size) {
super(type, FieldType.INT32, FieldType.STRING, size);
}
}

private JdbcType(
JDBCType jdbcType, FieldType argumentType, FieldType baseType, Object argument) {
this.identifier = jdbcType.getName();
this.argumentType = argumentType;
this.baseType = baseType;
this.argument = argument;
}

@Override
public String getIdentifier() {
return identifier;
}

@Override
public @Nullable FieldType getArgumentType() {
return argumentType;
}

@Override
public FieldType getBaseType() {
return baseType;
}

@Override
@SuppressWarnings("TypeParameterUnusedInFormals")
public <T1> @Nullable T1 getArgument() {
return (T1) argument;
}

@Override
public @NonNull T toBaseType(@NonNull T input) {
return input;
}

@Override
public @NonNull T toInputType(@NonNull T base) {
return base;
}
}
}
Loading

0 comments on commit 88d4712

Please sign in to comment.