Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12385] Handle VARCHAR and other SQL specific logical types in AvroUtils #14858

Closed
wants to merge 7 commits into from
Closed
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
## New Features / Improvements

* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* Add support to convert Beam Schema to Avro Schema for JDBC LogicalTypes:
`VARCHAR`, `NVARCHAR`, `LONGVARCHAR`, `LONGNVARCHAR`, `DATE`, `TIME`
(Java)([BEAM-12385](https://issues.apache.org/jira/browse/BEAM-12385)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.JDBCType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -906,6 +907,23 @@ private static org.apache.avro.Schema getFieldSchema(
.map(x -> getFieldSchema(x.getType(), x.getName(), namespace))
.collect(Collectors.toList()));
break;

case "NVARCHAR":
anantdamle marked this conversation as resolved.
Show resolved Hide resolved
case "VARCHAR":
case "LONGNVARCHAR":
case "LONGVARCHAR":
baseType = makeJdbcLogicalStringAvroType(fieldType.getLogicalType());
break;

case "DATE":
baseType = LogicalTypes.date().addToSchema(org.apache.avro.Schema.create(Type.INT));
break;

case "TIME":
baseType =
LogicalTypes.timeMillis().addToSchema(org.apache.avro.Schema.create(Type.INT));
break;

default:
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
Expand Down Expand Up @@ -1017,6 +1035,19 @@ private static org.apache.avro.Schema getFieldSchema(
typeWithNullability.type.getTypes().get(oneOfValue.getCaseType().getValue()),
oneOfValue.getValue());
}

case "NVARCHAR":
anantdamle marked this conversation as resolved.
Show resolved Hide resolved
case "VARCHAR":
case "LONGNVARCHAR":
case "LONGVARCHAR":
return new Utf8((String) value);

case "DATE":
return Days.daysBetween(Instant.EPOCH, (Instant) value).getDays();

case "TIME":
return (int) ((Instant) value).getMillis();

default:
throw new RuntimeException(
"Unhandled logical type " + fieldType.getLogicalType().getIdentifier());
Expand Down Expand Up @@ -1277,4 +1308,17 @@ private static void checkTypeName(Schema.TypeName got, Schema.TypeName expected,
checkArgument(
got.equals(expected), "Can't convert '%s' to %s, expected: %s", label, got, expected);
}

/** Helper factory to build JDBC Logical types for AVRO Schema. */
private static org.apache.avro.Schema makeJdbcLogicalStringAvroType(
Schema.LogicalType<?, ?> logicalType) {
JDBCType jdbcType = JDBCType.valueOf(logicalType.getIdentifier());
Integer size = logicalType.getArgument();

String schemaJson =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, optionally you can create Avro schemas in a more readable way:

return org.apache.avro.SchemaBuilder.builder()
    .stringBuilder().prop("logicalType", jdbcType.name()).prop("maxLength", size).endString();

This is for info -- they should (must!) be equivalent to using the Parser. This snippet is small but builders are a good practice for larger schemas!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RyanSkraba thanks for the suggestion, one specific reason I didn't use the SchemaBuilder is due to the need to use a an Integer (non-String) value for the property. The Schema produced by using SchemaBuilder looks like:
{"type":"string","logicalType":"LONGVARCHAR","maxLength": "50"}
vs. the expected:
{"type":"string","logicalType":"LONGVARCHAR","maxLength": 50}

I think its the same reason, Hive's TypeInfoToSchema#L116 also uses the JSON based parsing approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I'm surprised -- thanks for pointing this out. It might be an Avro bug! I'm pretty sure that if size is an int it's a JSON number.
Regardless, thanks for the update, let's use the Hive approach then!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate to be this annoying @anantdamle but since we are trying to 'align' with Hive/Spark maybe it is good that we name the logicalType names to coincide with the ones in the class you mention which are in lowercase.

https://github.com/apache/hive/blob/135629b8d6b538fed092641537034a9fbc59c7a0/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java#L59

Copy link
Contributor Author

@anantdamle anantdamle Jun 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries @iemejia, concern in using is that Hive only provides varchar, how to then deal with others like longvarchar etc.
If I use lowercase then converting back to JDBCType would become hard.
Do you suggest converting all the string based logical types to just varchar with appropriate maxLength?

The approach in JdbcIO schema is to represent them with Uppercase logical type of JDBC.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the logicalTypes that Hive/Spark define are spelled with lowercase so let's do like them.

It is a bit odd compared with the Java SQL Types (on uppercase), at least SQL should be casing agnostic so it should not matter in that front.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iemejia made the changes, with following mapping as per Hive, as there are only two categories:

Variable Length Strings:
[VARCHAR, LONGVARCHAR, NVARCHAR, LONGNVARCHAR] -> varchar

Fixed Length Strings:
[CHAR, NCHAR] -> char

String.format(
"{\"type\": \"string\", \"logicalType\": \"%s\", \"maxLength\": %s}",
jdbcType.name(), size);
return new org.apache.avro.Schema.Parser().parse(schemaJson);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.JDBCType;
import java.util.List;
import java.util.Map;
import org.apache.avro.Conversions;
Expand Down Expand Up @@ -57,8 +58,13 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.Instant;
import org.joda.time.LocalTime;
import org.junit.Test;
import org.junit.runner.RunWith;

Expand Down Expand Up @@ -550,6 +556,156 @@ public void testUnionFieldInBeamSchema() {
assertEquals(expectedGenericRecord, AvroUtils.toGenericRecord(row, avroSchema));
}

@Test
public void testJdbcLogicalVarCharRowDataToAvroSchema() {
String expectedAvroSchemaJson =
"{ "
+ " \"name\": \"topLevelRecord\", "
+ " \"type\": \"record\", "
+ " \"fields\": [{ "
+ " \"name\": \"my_varchar_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"VARCHAR\", \"maxLength\": 10}"
+ " }, "
+ " { "
+ " \"name\": \"my_longvarchar_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"LONGVARCHAR\", \"maxLength\": 50}"
+ " }, "
+ " { "
+ " \"name\": \"my_nvarchar_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"NVARCHAR\", \"maxLength\": 10}"
+ " }, "
+ " { "
+ " \"name\": \"my_longnvarchar_field\", "
+ " \"type\": {\"type\": \"string\", \"logicalType\": \"LONGNVARCHAR\", \"maxLength\": 50}"
+ " } "
+ " ] "
+ "}";

Schema beamSchema =
Schema.builder()
.addField(
Field.of(
"my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10))))
.addField(
Field.of(
"my_longvarchar_field",
FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
.addField(
Field.of(
"my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
.addField(
Field.of(
"my_longnvarchar_field",
FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
.build();

assertEquals(
new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
AvroUtils.toAvroSchema(beamSchema));
}

@Test
public void testJdbcLogicalVarCharRowDataToGenericRecord() {
Schema beamSchema =
Schema.builder()
.addField(
Field.of(
"my_varchar_field", FieldType.logicalType(JdbcType.StringType.varchar(10))))
.addField(
Field.of(
"my_longvarchar_field",
FieldType.logicalType(JdbcType.StringType.longvarchar(50))))
.addField(
Field.of(
"my_nvarchar_field", FieldType.logicalType(JdbcType.StringType.nvarchar(10))))
.addField(
Field.of(
"my_longnvarchar_field",
FieldType.logicalType(JdbcType.StringType.longnvarchar(50))))
.build();

Row rowData =
Row.withSchema(beamSchema)
.addValue("varchar_value")
.addValue("longvarchar_value")
.addValue("nvarchar_value")
.addValue("longnvarchar_value")
.build();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord expectedRecord =
new GenericRecordBuilder(avroSchema)
.set("my_varchar_field", "varchar_value")
.set("my_longvarchar_field", "longvarchar_value")
.set("my_nvarchar_field", "nvarchar_value")
.set("my_longnvarchar_field", "longnvarchar_value")
.build();

assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testJdbcLogicalDateAndTimeRowDataToAvroSchema() {
String expectedAvroSchemaJson =
"{ "
+ " \"name\": \"topLevelRecord\", "
+ " \"type\": \"record\", "
+ " \"fields\": [{ "
+ " \"name\": \"my_date_field\", "
+ " \"type\": { \"type\": \"int\", \"logicalType\": \"date\" }"
+ " }, "
+ " { "
+ " \"name\": \"my_time_field\", "
+ " \"type\": { \"type\": \"int\", \"logicalType\": \"time-millis\" }"
+ " }"
+ " ] "
+ "}";

Schema beamSchema =
Schema.builder()
.addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE)))
.addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME)))
.build();

assertEquals(
new org.apache.avro.Schema.Parser().parse(expectedAvroSchemaJson),
AvroUtils.toAvroSchema(beamSchema));
}

@Test
public void testJdbcLogicalDateAndTimeRowDataToGenericRecord() {
// Test Fixed clock at
DateTime testDateTime = DateTime.parse("2021-05-29T11:15:16.234Z");

Schema beamSchema =
Schema.builder()
.addField(Field.of("my_date_field", FieldType.logicalType(JdbcType.DATE)))
.addField(Field.of("my_time_field", FieldType.logicalType(JdbcType.TIME)))
.build();

Row rowData =
Row.withSchema(beamSchema)
.addValue(testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
.addValue(Instant.ofEpochMilli(testDateTime.toLocalTime().millisOfDay().get()))
.build();

int daysFromEpoch =
Days.daysBetween(
Instant.EPOCH,
testDateTime.toLocalDate().toDateTime(LocalTime.MIDNIGHT).toInstant())
.getDays();
int timeSinceMidNight = testDateTime.toLocalTime().getMillisOfDay();

org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(beamSchema);
GenericRecord expectedRecord =
new GenericRecordBuilder(avroSchema)
.set("my_date_field", daysFromEpoch)
.set("my_time_field", timeSinceMidNight)
.build();

assertEquals(expectedRecord, AvroUtils.toGenericRecord(rowData, avroSchema));
}

@Test
public void testBeamRowToGenericRecord() {
GenericRecord genericRecord = AvroUtils.toGenericRecord(getBeamRow(), null);
Expand Down Expand Up @@ -640,4 +796,80 @@ public void testNullSchemas() {
AvroUtils.getFromRowFunction(GenericRecord.class),
AvroUtils.getFromRowFunction(GenericRecord.class));
}

/** Helper class that simulate JDBC Logical types. */
private static class JdbcType<T> implements Schema.LogicalType<T, T> {

private static final JdbcType<Instant> DATE =
new JdbcType<>(JDBCType.DATE, FieldType.STRING, FieldType.DATETIME, "");
private static final JdbcType<Instant> TIME =
new JdbcType<>(JDBCType.TIME, FieldType.STRING, FieldType.DATETIME, "");

private final String identifier;
private final FieldType argumentType;
private final FieldType baseType;
private final Object argument;

private static class StringType extends JdbcType<String> {

private static StringType varchar(int size) {
return new StringType(JDBCType.VARCHAR, size);
}

private static StringType longvarchar(int size) {
return new StringType(JDBCType.LONGVARCHAR, size);
}

private static StringType nvarchar(int size) {
return new StringType(JDBCType.NVARCHAR, size);
}

private static StringType longnvarchar(int size) {
return new StringType(JDBCType.LONGNVARCHAR, size);
}

private StringType(JDBCType type, int size) {
super(type, FieldType.INT32, FieldType.STRING, size);
}
}

private JdbcType(
JDBCType jdbcType, FieldType argumentType, FieldType baseType, Object argument) {
this.identifier = jdbcType.getName();
this.argumentType = argumentType;
this.baseType = baseType;
this.argument = argument;
}

@Override
public String getIdentifier() {
return identifier;
}

@Override
public @Nullable FieldType getArgumentType() {
return argumentType;
}

@Override
public FieldType getBaseType() {
return baseType;
}

@Override
@SuppressWarnings("TypeParameterUnusedInFormals")
public <T1> @Nullable T1 getArgument() {
return (T1) argument;
}

@Override
public @NonNull T toBaseType(@NonNull T input) {
return input;
}

@Override
public @NonNull T toInputType(@NonNull T base) {
return base;
}
}
}
Loading