Skip to content

Commit

Permalink
feat(bigquery managedwriter): schema conversion support (#4357)
Browse files Browse the repository at this point in the history
This is the first of multiple PRs to build up the functionality of a new
thick client over the new BigQuery Storage API's write mechanism.

This PR exposes schema conversion between the main bigquery package and
the bigquery storage API.

Towards: #4366
  • Loading branch information
shollyman authored Jul 1, 2021
1 parent 58d4055 commit f2b20f4
Show file tree
Hide file tree
Showing 4 changed files with 384 additions and 0 deletions.
19 changes: 19 additions & 0 deletions bigquery/storage/managedwriter/adapt/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package adapt adds functionality related to converting bigquery representations
// like schema and data type representations.
//
// It is EXPERIMENTAL and subject to change or removal without notice.
package adapt
140 changes: 140 additions & 0 deletions bigquery/storage/managedwriter/adapt/schemaconversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package adapt

import (
"fmt"

"cloud.google.com/go/bigquery"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
)

var fieldTypeMap = map[bigquery.FieldType]storagepb.TableFieldSchema_Type{
bigquery.StringFieldType: storagepb.TableFieldSchema_STRING,
bigquery.BytesFieldType: storagepb.TableFieldSchema_BYTES,
bigquery.IntegerFieldType: storagepb.TableFieldSchema_INT64,
bigquery.FloatFieldType: storagepb.TableFieldSchema_DOUBLE,
bigquery.BooleanFieldType: storagepb.TableFieldSchema_BOOL,
bigquery.TimestampFieldType: storagepb.TableFieldSchema_TIMESTAMP,
bigquery.RecordFieldType: storagepb.TableFieldSchema_STRUCT,
bigquery.DateFieldType: storagepb.TableFieldSchema_DATE,
bigquery.TimeFieldType: storagepb.TableFieldSchema_TIME,
bigquery.DateTimeFieldType: storagepb.TableFieldSchema_DATETIME,
bigquery.NumericFieldType: storagepb.TableFieldSchema_NUMERIC,
bigquery.BigNumericFieldType: storagepb.TableFieldSchema_BIGNUMERIC,
bigquery.GeographyFieldType: storagepb.TableFieldSchema_GEOGRAPHY,
}

func bqFieldToProto(in *bigquery.FieldSchema) (*storagepb.TableFieldSchema, error) {
if in == nil {
return nil, nil
}
out := &storagepb.TableFieldSchema{
Name: in.Name,
Description: in.Description,
}

// Type conversion.
typ, ok := fieldTypeMap[in.Type]
if !ok {
return nil, fmt.Errorf("could not convert field (%s) due to unknown type value: %s", in.Name, in.Type)
}
out.Type = typ

// Mode conversion. Repeated trumps required.
out.Mode = storagepb.TableFieldSchema_NULLABLE
if in.Repeated {
out.Mode = storagepb.TableFieldSchema_REPEATED
}
if !in.Repeated && in.Required {
out.Mode = storagepb.TableFieldSchema_REQUIRED
}

for _, s := range in.Schema {
subField, err := bqFieldToProto(s)
if err != nil {
return nil, err
}
out.Fields = append(out.Fields, subField)
}
return out, nil
}

func protoToBQField(in *storagepb.TableFieldSchema) (*bigquery.FieldSchema, error) {
if in == nil {
return nil, nil
}
out := &bigquery.FieldSchema{
Name: in.GetName(),
Description: in.GetDescription(),
Repeated: in.GetMode() == storagepb.TableFieldSchema_REPEATED,
Required: in.GetMode() == storagepb.TableFieldSchema_REQUIRED,
}

typeResolved := false
for k, v := range fieldTypeMap {
if v == in.GetType() {
out.Type = k
typeResolved = true
break
}
}
if !typeResolved {
return nil, fmt.Errorf("could not convert proto type to bigquery type: %v", in.GetType().String())
}

for _, s := range in.Fields {
subField, err := protoToBQField(s)
if err != nil {
return nil, err
}
out.Schema = append(out.Schema, subField)
}
return out, nil
}

// BQSchemaToStorageTableSchema converts a bigquery Schema into the protobuf-based TableSchema used
// by the BigQuery Storage WriteClient.
func BQSchemaToStorageTableSchema(in bigquery.Schema) (*storagepb.TableSchema, error) {
if in == nil {
return nil, nil
}
out := &storagepb.TableSchema{}
for _, s := range in {
converted, err := bqFieldToProto(s)
if err != nil {
return nil, err
}
out.Fields = append(out.Fields, converted)
}
return out, nil
}

// StorageTableSchemaToBQSchema converts a TableSchema from the BigQuery Storage WriteClient
// into the equivalent BigQuery Schema.
func StorageTableSchemaToBQSchema(in *storagepb.TableSchema) (bigquery.Schema, error) {
if in == nil {
return nil, nil
}
var out bigquery.Schema
for _, s := range in.Fields {
converted, err := protoToBQField(s)
if err != nil {
return nil, err
}
out = append(out, converted)
}
return out, nil
}
203 changes: 203 additions & 0 deletions bigquery/storage/managedwriter/adapt/schemaconversion_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package adapt

import (
"testing"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/internal/testutil"
"github.com/google/go-cmp/cmp"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/testing/protocmp"
)

func TestFieldConversions(t *testing.T) {
testCases := []struct {
desc string
bq *bigquery.FieldSchema
proto *storagepb.TableFieldSchema
}{
{
desc: "nil",
bq: nil,
proto: nil,
},
{
desc: "string field",
bq: &bigquery.FieldSchema{
Name: "name",
Type: bigquery.StringFieldType,
Description: "description",
},
proto: &storagepb.TableFieldSchema{
Name: "name",
Type: storagepb.TableFieldSchema_STRING,
Description: "description",
Mode: storagepb.TableFieldSchema_NULLABLE,
},
},
{
desc: "required integer field",
bq: &bigquery.FieldSchema{
Name: "name",
Type: bigquery.IntegerFieldType,
Description: "description",
Required: true,
},
proto: &storagepb.TableFieldSchema{
Name: "name",
Type: storagepb.TableFieldSchema_INT64,
Description: "description",
Mode: storagepb.TableFieldSchema_REQUIRED,
},
},
{
desc: "struct with repeated bytes subfield",
bq: &bigquery.FieldSchema{
Name: "name",
Type: bigquery.RecordFieldType,
Description: "description",
Required: true,
Schema: bigquery.Schema{
&bigquery.FieldSchema{
Name: "inner1",
Repeated: true,
Description: "repeat",
Type: bigquery.BytesFieldType,
},
},
},
proto: &storagepb.TableFieldSchema{
Name: "name",
Type: storagepb.TableFieldSchema_STRUCT,
Description: "description",
Mode: storagepb.TableFieldSchema_REQUIRED,
Fields: []*storagepb.TableFieldSchema{
{
Name: "inner1",
Mode: storagepb.TableFieldSchema_REPEATED,
Description: "repeat",
Type: storagepb.TableFieldSchema_BYTES,
},
},
},
},
}

for _, tc := range testCases {
// first, bq to proto
converted, err := bqFieldToProto(tc.bq)
if err != nil {
t.Errorf("case (%s) failed conversion from bq: %v", tc.desc, err)
}
if diff := cmp.Diff(converted, tc.proto, protocmp.Transform()); diff != "" {
t.Errorf("conversion to proto diff (%s):\n%v", tc.desc, diff)
}
// reverse conversion, proto to bq
reverse, err := protoToBQField(tc.proto)
if err != nil {
t.Errorf("case (%s) failed conversion from proto: %v", tc.desc, err)
}
if diff := cmp.Diff(reverse, tc.bq); diff != "" {
t.Errorf("conversion to BQ diff (%s):\n%v", tc.desc, diff)
}
}
}

func TestSchemaConversion(t *testing.T) {

testCases := []struct {
description string
bqSchema bigquery.Schema
storageSchema *storagepb.TableSchema
}{
{
description: "nil",
bqSchema: nil,
storageSchema: nil,
},
{
description: "scalars",
bqSchema: bigquery.Schema{
{Name: "f1", Type: bigquery.StringFieldType},
{Name: "f2", Type: bigquery.IntegerFieldType},
{Name: "f3", Type: bigquery.BooleanFieldType},
},
storageSchema: &storagepb.TableSchema{
Fields: []*storagepb.TableFieldSchema{
{Name: "f1", Type: storagepb.TableFieldSchema_STRING, Mode: storagepb.TableFieldSchema_NULLABLE},
{Name: "f2", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_NULLABLE},
{Name: "f3", Type: storagepb.TableFieldSchema_BOOL, Mode: storagepb.TableFieldSchema_NULLABLE},
},
},
},
{
description: "array",
bqSchema: bigquery.Schema{
{Name: "arr", Type: bigquery.NumericFieldType, Repeated: true},
{Name: "big", Type: bigquery.BigNumericFieldType, Required: true},
},
storageSchema: &storagepb.TableSchema{
Fields: []*storagepb.TableFieldSchema{
{Name: "arr", Type: storagepb.TableFieldSchema_NUMERIC, Mode: storagepb.TableFieldSchema_REPEATED},
{Name: "big", Type: storagepb.TableFieldSchema_BIGNUMERIC, Mode: storagepb.TableFieldSchema_REQUIRED},
},
},
},
{
description: "nested",
bqSchema: bigquery.Schema{
{Name: "struct1", Type: bigquery.RecordFieldType, Schema: []*bigquery.FieldSchema{
{Name: "leaf1", Type: bigquery.DateFieldType},
{Name: "leaf2", Type: bigquery.DateTimeFieldType},
}},
{Name: "field2", Type: bigquery.StringFieldType},
},
storageSchema: &storagepb.TableSchema{
Fields: []*storagepb.TableFieldSchema{
{Name: "struct1",
Type: storagepb.TableFieldSchema_STRUCT,
Mode: storagepb.TableFieldSchema_NULLABLE,
Fields: []*storagepb.TableFieldSchema{
{Name: "leaf1", Type: storagepb.TableFieldSchema_DATE, Mode: storagepb.TableFieldSchema_NULLABLE},
{Name: "leaf2", Type: storagepb.TableFieldSchema_DATETIME, Mode: storagepb.TableFieldSchema_NULLABLE},
}},
{Name: "field2", Type: storagepb.TableFieldSchema_STRING, Mode: storagepb.TableFieldSchema_NULLABLE},
},
},
},
}
for _, tc := range testCases {

// BQ -> Storage
storageS, err := BQSchemaToStorageTableSchema(tc.bqSchema)
if err != nil {
t.Errorf("BQSchemaToStorageTableSchema(%s): %v", tc.description, err)
}
if diff := testutil.Diff(storageS, tc.storageSchema); diff != "" {
t.Fatalf("BQSchemaToStorageTableSchema(%s): -got, +want:\n%s", tc.description, diff)
}

// Storage -> BQ
bqS, err := StorageTableSchemaToBQSchema(tc.storageSchema)
if err != nil {
t.Errorf("StorageTableSchemaToBQSchema(%s): %v", tc.description, err)
}
if diff := testutil.Diff(bqS, tc.bqSchema); diff != "" {
t.Fatalf("StorageTableSchemaToBQSchema(%s): -got, +want:\n%s", tc.description, diff)
}
}
}
22 changes: 22 additions & 0 deletions bigquery/storage/managedwriter/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package managedwriter will be a thick client around the storage API's BigQueryWriteClient.
//
// It is EXPERIMENTAL and subject to change or removal without notice.
//
// Currently, the BigQueryWriteClient this library targets is exposed in the storage v1beta2 endpoint, and is
// a successor to the streaming interface. API method tabledata.insertAll is the primary backend method, and
// the Inserter abstraction is the equivalent to this in the cloud.google.com/go/bigquery package.
package managedwriter

0 comments on commit f2b20f4

Please sign in to comment.