This method transforms an Avro {@link Schema} into a BigQuery {@link + * com.google.cloud.bigquery.Schema}. It iterates through the fields of the Avro schema, + * converts each field to its BigQuery equivalent, and constructs a BigQuery schema with the + * resulting fields. + * + *
For each Avro field, the method extracts the field's name and documentation (if + * available), and uses this information to build the corresponding BigQuery field. + * + *
The Avro schema must be of type Avro RECORD
+ *
+ * @param avroSchema The Avro schema to convert.
+ * @return The converted BigQuery {@link com.google.cloud.bigquery.Schema}.
+ */
+ public static com.google.cloud.bigquery.Schema getBigQuerySchema(Schema avroSchema) {
+ if (avroSchema == null) {
+ throw new IllegalArgumentException(
+ "Could not convert avro schema of expected output to BigQuery table schema. "
+ + "The avro schema of the expected output record cannot be null");
+ }
+ if (avroSchema.getType() == null) {
+ throw new IllegalArgumentException(
+ "Could not convert avro schema of expected output to BigQuery table schema. "
+ + "The avro schema of the output record must have a type");
+ }
+ if (avroSchema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not convert avro schema of expected output to BigQuery table "
+ + "schema. Avro schema must be of type RECORD, found %s",
+ avroSchema.getType()));
+ }
+ if (avroSchema.getFields().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Could not convert avro schema of expected output to BigQuery table schema. "
+ + "The avro schema of the output record must have at least one field.");
+ }
+ // Iterate over each record field and add them to the BigQuery schema.
+ List BigQuery supports nullable fields but not unions with multiple non-null types. This method
+ * validates the Avro UNION schema to ensure it conforms to BigQuery's requirements.
+ *
+ * Valid UNION schemas are:
+ *
+ * Invalid UNION schemas include:
+ *
+ * If the UNION schema is valid, this method returns a BigQuery field with the schema of the
+ * non-null datatype. Otherwise, it throws an {@link IllegalArgumentException}.
+ *
+ * @param avroSchema The Avro UNION schema to process.
+ * @return The BigQuery field corresponding to the non-null type in the UNION.
+ * @throws IllegalArgumentException if the UNION schema is invalid for BigQuery.
+ */
+ private static Field convertAvroUnionFieldToBigQueryField(
+ Schema avroSchema, String name, int nestedLevel) {
+ List The following restrictions imposed by BigQuery's schema definition:
+ *
+ * If any of these restrictions are violated, an {@link IllegalArgumentException} is thrown.
+ *
+ * @param avroSchema The Avro schema of the repeated field.
+ * @param name The name of the field.
+ * @param nestedLevel The nesting level of the field.
+ * @return The converted BigQuery {@link Field} with mode set to {@link Field.Mode#REPEATED}.
+ * @throws IllegalArgumentException if the Avro schema violates any of the restrictions for
+ * BigQuery repeated fields.
+ */
+ private static Field convertAvroRepeatedFieldToBigQueryField(
+ Schema avroSchema, String name, int nestedLevel) {
+ Schema arrayAvroSchema = avroSchema.getElementType();
+ if (arrayAvroSchema.getType() == Schema.Type.ARRAY) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not convert schema of avro field: %s to BigQuery table schema. "
+ + "BigQuery ARRAY cannot have recursive ARRAY fields.",
+ name));
+ }
+ Field innerArrayField = convertAvroFieldToBigQueryField(arrayAvroSchema, name, nestedLevel);
+ if (innerArrayField.getMode() != Field.Mode.REQUIRED) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not convert schema of avro field: %s to BigQuery table schema. "
+ + "Array cannot have a NULLABLE element in BigQuery",
+ name));
+ }
+ return innerArrayField.toBuilder().setMode(Field.Mode.REPEATED).build();
+ }
+
+ /**
+ * Helper function convert Avro Field to BigQuery Field for Primitive and Logical Datatypes.
+ *
+ * LOGICAL: Use elementType.getProp() to obtain the string for the property name and
+ * search for its corresponding mapping in the LOGICAL_AVRO_TYPES_TO_BQ_TYPES map.
+ *
+ * PRIMITIVE: If there is no match for the logical type (or there is no logical type
+ * present), the field data type is attempted to be mapped to a PRIMITIVE type map.
+ *
+ * @param avroSchema the Avro schema of the required field
+ * @param name the name of the field
+ * @return the converted BigQuery field with the appropriate data type and mode set to REQUIRED
+ * @throws UnsupportedOperationException if the Avro type is not supported
+ * @throws IllegalArgumentException if the Avro schema is invalid for a decimal logical type
+ */
+ private static Field convertAvroRequiredFieldToBigQueryField(Schema avroSchema, String name)
+ throws UnsupportedOperationException, IllegalArgumentException {
+ StandardSQLTypeName dataType;
+
+ // Handle decimal logical types by extracting precision and setting the appropriate
+ // StandardSQLTypeName.
+ // The conversions are according to
+ // https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#decimal_types
+ if ("decimal".equals(avroSchema.getProp(LogicalType.LOGICAL_TYPE_PROP))) {
+ dataType = handleDecimalLogicalType(avroSchema, name);
+ if (dataType == StandardSQLTypeName.NUMERIC
+ || dataType == StandardSQLTypeName.BIGNUMERIC) {
+ // The precision and scale is also set in the BigQuery Field
+ long precision =
+ ((LogicalTypes.Decimal) avroSchema.getLogicalType()).getPrecision();
+ long scale = ((LogicalTypes.Decimal) avroSchema.getLogicalType()).getScale();
+ return Field.newBuilder(name, dataType)
+ .setMode(Field.Mode.REQUIRED)
+ .setPrecision(precision)
+ .setScale(scale)
+ .build();
+ }
+ } else {
+ dataType =
+ Optional.ofNullable(avroSchema.getProp(LogicalType.LOGICAL_TYPE_PROP))
+ .map(LOGICAL_AVRO_TYPES_TO_BQ_TYPES::get)
+ .orElse(PRIMITIVE_AVRO_TYPES_TO_BQ_TYPES.get(avroSchema.getType()));
+ }
+ if (dataType == null) {
+ throw new UnsupportedOperationException(
+ getUnsupportedTypeErrorMessage(avroSchema.getType().toString(), name));
+ }
+
+ return Field.newBuilder(name, dataType).setMode(Field.Mode.REQUIRED).build();
+ }
+
+ /**
+ * Helper method to handle Avro decimal logical types by determining the appropriate BigQuery
+ * data type based on the precision of the decimal.
+ *
+ * @param avroSchema the Avro schema of the decimal field
+ * @param name the name of the field
+ */
+ private static StandardSQLTypeName handleDecimalLogicalType(Schema avroSchema, String name) {
+ validatePrecisionAndScale(avroSchema, name);
+ long precision = ((LogicalTypes.Decimal) avroSchema.getLogicalType()).getPrecision();
+ long scale = ((LogicalTypes.Decimal) avroSchema.getLogicalType()).getScale();
+
+ if (precision > 0
+ && precision <= (scale + (MAX_BQ_NUMERIC_PRECISION - MAX_BQ_NUMERIC_SCALE))
+ && scale <= MAX_BQ_NUMERIC_SCALE) {
+ return StandardSQLTypeName.NUMERIC;
+ } else if (precision > 0
+ && precision <= (scale + (MAX_BQ_BIGNUMERIC_PRECISION - MAX_BQ_BIGNUMERIC_SCALE))
+ && scale <= MAX_BQ_BIGNUMERIC_SCALE) {
+ return StandardSQLTypeName.BIGNUMERIC;
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not convert schema of avro field: %s to BigQuery table schema. "
+ + "BigQuery does not support Decimal types with "
+ + "precision %d and scale %d.",
+ name, precision, scale));
+ }
+
+ /**
+ * Validates the precision and scale of an Avro decimal logical type. Ensures precision and
+ * scale are non-negative and that scale does not exceed precision.
+ *
+ * @param schema the Avro schema of the decimal field
+ * @param name the name of the field
+ * @throws IllegalArgumentException if precision or scale are invalid
+ */
+ private static void validatePrecisionAndScale(Schema schema, String name) {
+ LogicalTypes.Decimal decimalLogicalSchema =
+ ((LogicalTypes.Decimal) schema.getLogicalType());
+ // The Avro schema of logical type "decimal" has "precision" and "scale" attributes
+ // inherently defined so null checks for these attributes are not required.
+ long precision = decimalLogicalSchema.getPrecision();
+ long scale = decimalLogicalSchema.getScale();
+ if (precision <= 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not convert avro field: %s of expected output to BigQuery table "
+ + "schema. Precision of decimal avro field must be "
+ + "non-negative. Saw: %d",
+ name, precision));
+ }
+ if (scale < 0) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not convert avro field: %s of expected output to BigQuery table "
+ + "schema. Scale of decimal avro field must be "
+ + "non-negative. Saw: %d",
+ name, scale));
+ }
+ if (precision < scale) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Could not convert avro field: %s of expected output to BigQuery table "
+ + "schema. Scale of the decimal avro field cannot exceed "
+ + "precision. Saw scale: %d, precision: %d",
+ name, scale, precision));
+ }
+ }
+
+ @VisibleForTesting
+ public static String getUnsupportedTypeErrorMessage(
+ String unsupportedAvroType, String fieldName) {
+ return String.format(
+ "Could not convert avro field: %s of expected output to BigQuery table schema. "
+ + "The avro type: %s of the field is not supported by BigQuery",
+ fieldName, unsupportedAvroType);
+ }
+}
diff --git a/flink-connector-bigquery-common/src/test/java/com/google/cloud/flink/bigquery/common/utils/AvroToBigQuerySchemaTransformTest.java b/flink-connector-bigquery-common/src/test/java/com/google/cloud/flink/bigquery/common/utils/AvroToBigQuerySchemaTransformTest.java
new file mode 100644
index 00000000..20c35537
--- /dev/null
+++ b/flink-connector-bigquery-common/src/test/java/com/google/cloud/flink/bigquery/common/utils/AvroToBigQuerySchemaTransformTest.java
@@ -0,0 +1,620 @@
+/*
+ * Copyright (C) 2024 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.flink.bigquery.common.utils;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.FieldList;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+import org.apache.avro.Schema.Type;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.cloud.flink.bigquery.common.utils.AvroToBigQuerySchemaTransform.getUnsupportedTypeErrorMessage;
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+
+/** Unit tests for {@link AvroToBigQuerySchemaTransform}. */
+public class AvroToBigQuerySchemaTransformTest {
+
+ private static final String GEOGRAPHY_LOGICAL_TYPE_NAME = "geography_wkt";
+ private static final String JSON_LOGICAL_TYPE_NAME = "Json";
+
+ private static Field createRequiredBigqueryField(String name, StandardSQLTypeName type) {
+ return Field.newBuilder(name, type).setMode(Field.Mode.REQUIRED).build();
+ }
+
+ private static Field createNullableBigqueryField(String name, StandardSQLTypeName type) {
+ return Field.newBuilder(name, type).setMode(Field.Mode.NULLABLE).build();
+ }
+
+ private static Field createRecordField(String name, Field... subFields) {
+ return Field.newBuilder(name, LegacySQLTypeName.RECORD, FieldList.of(subFields))
+ .setMode(Field.Mode.REQUIRED)
+ .build();
+ }
+
+ private static Field createRepeatedBigqueryField(String name, StandardSQLTypeName type) {
+ return Field.newBuilder(name, type).setMode(Field.Mode.REPEATED).build();
+ }
+
+ /** Tests Avro Schema with all Primitive Data Types. */
+ @Test
+ public void testAllTypesSchemaSuccessful() {
+ String avroSchemaString =
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"allTypes\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"string_field\", \"type\": \"string\"},\n"
+ + " {\"name\": \"bytes_field\", \"type\": \"bytes\"},\n"
+ + " {\"name\": \"integer_field\", \"type\": \"long\"},\n"
+ + " {\"name\": \"array_field\", \"type\": {\"type\": \"array\", "
+ + "\"items\": \"double\"}},\n"
+ + " {\"name\": \"numeric_field\", \"type\": [\"null\", {\"type\": "
+ + "\"bytes\", \"logicalType\": \"decimal\", \"precision\": 38, \"scale\":"
+ + " 9}]},\n"
+ + " {\"name\": \"bignumeric_field\", \"type\": [\"null\", {\"type\": "
+ + "\"bytes\", \"logicalType\": \"decimal\", \"precision\": 76, \"scale\":"
+ + " 38}]},\n"
+ + " {\"name\": \"boolean_field\", \"type\": [\"null\", \"boolean\"]},\n"
+ + " {\"name\": \"ts_field\", \"type\": [\"null\", {\"type\": \"long\","
+ + " \"logicalType\": \"timestamp-micros\"}]},\n"
+ + " {\"name\": \"date_field\", \"type\": [\"null\", {\"type\": "
+ + "\"int\", \"logicalType\": \"date\"}]},\n"
+ + " {\"name\": \"time_field\", \"type\": [\"null\", {\"type\": "
+ + "\"long\", \"logicalType\": \"time-micros\"}]},\n"
+ + " {\"name\": \"datetime_field\", \"type\": [\"null\", {\"type\": "
+ + "\"long\", \"logicalType\": \"local-timestamp-micros\"}]},\n"
+ + " {\"name\": \"geography_field\", \"type\": [\"null\", {\"type\": "
+ + "\"string\", \"logicalType\": \"geography_wkt\"}]},\n"
+ + " {\"name\": \"record_field\", \"type\": {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"record_field\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"json_field\", \"type\": [\"null\", {\"type\": "
+ + "\"string\", \"logicalType\": \"Json\"}]},\n"
+ + " {\"name\": \"geography_field\", \"type\": [\"null\", "
+ + "{\"type\": \"string\", \"logicalType\": \"geography_wkt\"}]}\n"
+ + " ]\n"
+ + " }}\n"
+ + " ]\n"
+ + "}";
+ Schema allTypesSchema = new Parser().parse(avroSchemaString);
+
+ com.google.cloud.bigquery.Schema expectedBqSchema =
+ com.google.cloud.bigquery.Schema.of(
+ createRequiredBigqueryField("string_field", StandardSQLTypeName.STRING),
+ createRequiredBigqueryField("bytes_field", StandardSQLTypeName.BYTES),
+ createRequiredBigqueryField("integer_field", StandardSQLTypeName.INT64),
+ createRepeatedBigqueryField("array_field", StandardSQLTypeName.FLOAT64),
+ createNullableBigqueryField("numeric_field", StandardSQLTypeName.NUMERIC)
+ .toBuilder()
+ .setPrecision(38L)
+ .setScale(9L)
+ .build(),
+ createNullableBigqueryField(
+ "bignumeric_field", StandardSQLTypeName.BIGNUMERIC)
+ .toBuilder()
+ .setPrecision(76L)
+ .setScale(38L)
+ .build(),
+ createNullableBigqueryField("boolean_field", StandardSQLTypeName.BOOL),
+ createNullableBigqueryField("ts_field", StandardSQLTypeName.TIMESTAMP),
+ createNullableBigqueryField("date_field", StandardSQLTypeName.DATE),
+ createNullableBigqueryField("time_field", StandardSQLTypeName.TIME),
+ createNullableBigqueryField("datetime_field", StandardSQLTypeName.DATETIME),
+ createNullableBigqueryField(
+ "geography_field", StandardSQLTypeName.GEOGRAPHY),
+ Field.newBuilder(
+ "record_field",
+ LegacySQLTypeName.RECORD,
+ FieldList.of(
+ createNullableBigqueryField(
+ "json_field", StandardSQLTypeName.JSON),
+ createNullableBigqueryField(
+ "geography_field",
+ StandardSQLTypeName.GEOGRAPHY)))
+ .setMode(Field.Mode.REQUIRED)
+ .build());
+
+ com.google.cloud.bigquery.Schema bqSchema =
+ AvroToBigQuerySchemaTransform.getBigQuerySchema(allTypesSchema);
+ assertExactSchema(bqSchema, expectedBqSchema);
+ }
+
+ /** Tests Avro Schema with all Logical Data Types. */
+ @Test
+ public void testLogicalTypesSuccessful() {
+ // Create an Avro schema with logical types
+ Schema avroSchema = Schema.createRecord("RecordWithLogicalTypes", "", "", false);
+ ArrayList This should throw Exception as this is an infinite recursion and is not supported by
+ * BigQuery
+ */
+ @Test
+ public void testInfiniteRecursiveSchemaThrowsException() {
+ // Build the Avro schema programmatically
+ Schema longListSchema = Schema.createRecord("LongList", "", "", false);
+ longListSchema.addAlias("LinkedLongs");
+
+ ArrayList
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *