Skip to content

Commit

Permalink
feat(bigquery): json support on managedwriter/adapt pkg (#10542)
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarowolfx authored Jul 12, 2024
1 parent fd53d47 commit 978d4a1
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 56 deletions.
2 changes: 2 additions & 0 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ var bqTypeToFieldTypeMap = map[storagepb.TableFieldSchema_Type]descriptorpb.Fiel
storagepb.TableFieldSchema_TIME: descriptorpb.FieldDescriptorProto_TYPE_INT64,
storagepb.TableFieldSchema_TIMESTAMP: descriptorpb.FieldDescriptorProto_TYPE_INT64,
storagepb.TableFieldSchema_RANGE: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE,
storagepb.TableFieldSchema_JSON: descriptorpb.FieldDescriptorProto_TYPE_STRING,
}

var allowedRangeTypes = []storagepb.TableFieldSchema_Type{
Expand Down Expand Up @@ -107,6 +108,7 @@ var bqTypeToWrapperMap = map[storagepb.TableFieldSchema_Type]string{
storagepb.TableFieldSchema_STRING: ".google.protobuf.StringValue",
storagepb.TableFieldSchema_TIME: ".google.protobuf.Int64Value",
storagepb.TableFieldSchema_TIMESTAMP: ".google.protobuf.Int64Value",
storagepb.TableFieldSchema_JSON: ".google.protobuf.StringValue",
}

// filename used by well known types proto
Expand Down
66 changes: 66 additions & 0 deletions bigquery/storage/managedwriter/adapt/protoconversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,65 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
{
description: "json type",
bq: &storagepb.TableSchema{
Fields: []*storagepb.TableFieldSchema{
{Name: "json_required", Type: storagepb.TableFieldSchema_JSON, Mode: storagepb.TableFieldSchema_REQUIRED},
{Name: "json_optional", Type: storagepb.TableFieldSchema_JSON, Mode: storagepb.TableFieldSchema_NULLABLE},
}},
wantProto2: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("json_required"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum()},
{
Name: proto.String("json_optional"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
wantProto2Normalized: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("json_required"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum(),
},
{
Name: proto.String("json_optional"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("json_required"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("json_optional"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".google.protobuf.StringValue"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
{
// exercise construct of a submessage
description: "nested",
Expand Down Expand Up @@ -1236,6 +1295,13 @@ func TestNormalizeDescriptor(t *testing.T) {
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("json_type"),
JsonName: proto.String("jsonType"),
Number: proto.Int32(4),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Expand Down
1 change: 1 addition & 0 deletions bigquery/storage/managedwriter/adapt/schemaconversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var fieldTypeMap = map[bigquery.FieldType]storagepb.TableFieldSchema_Type{
bigquery.BigNumericFieldType: storagepb.TableFieldSchema_BIGNUMERIC,
bigquery.GeographyFieldType: storagepb.TableFieldSchema_GEOGRAPHY,
bigquery.RangeFieldType: storagepb.TableFieldSchema_RANGE,
bigquery.JSONFieldType: storagepb.TableFieldSchema_JSON,
}

func bqFieldToProto(in *bigquery.FieldSchema) (*storagepb.TableFieldSchema, error) {
Expand Down
26 changes: 26 additions & 0 deletions bigquery/storage/managedwriter/adapt/schemaconversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ func TestFieldConversions(t *testing.T) {
},
},
},
{
desc: "json type",
bq: &bigquery.FieldSchema{
Name: "name",
Type: bigquery.JSONFieldType,
Description: "description",
Required: true,
},
proto: &storagepb.TableFieldSchema{
Name: "name",
Type: storagepb.TableFieldSchema_JSON,
Description: "description",
Mode: storagepb.TableFieldSchema_REQUIRED,
},
},
{
desc: "range type",
bq: &bigquery.FieldSchema{
Expand Down Expand Up @@ -200,6 +215,17 @@ func TestSchemaConversion(t *testing.T) {
},
},
},
{
description: "json type",
bqSchema: bigquery.Schema{
{Name: "json", Type: bigquery.JSONFieldType},
},
storageSchema: &storagepb.TableSchema{
Fields: []*storagepb.TableFieldSchema{
{Name: "json", Type: storagepb.TableFieldSchema_JSON, Mode: storagepb.TableFieldSchema_NULLABLE},
},
},
},
{
description: "range types",
bqSchema: bigquery.Schema{
Expand Down
63 changes: 63 additions & 0 deletions bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Parallel()
testDefaultStreamDynamicJSON(ctx, t, mwClient, bqClient, dataset)
})
t.Run("DefaultStreamJSONData", func(t *testing.T) {
t.Parallel()
testDefaultStreamJSONData(ctx, t, mwClient, bqClient, dataset)
})
t.Run("CommittedStream", func(t *testing.T) {
t.Parallel()
testCommittedStream(ctx, t, mwClient, bqClient, dataset)
Expand Down Expand Up @@ -455,6 +459,63 @@ func testDefaultStreamDynamicJSON(ctx context.Context, t *testing.T, mwClient *C
withDistinctValues("public", int64(2)))
}

func testDefaultStreamJSONData(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.ComplexTypeSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

md, descriptorProto := setupDynamicDescriptors(t, testdata.ComplexTypeSchema)

ms, err := mwClient.NewManagedStream(ctx,
WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)),
WithType(DefaultStream),
WithSchemaDescriptor(descriptorProto),
)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

sampleJSONData := [][]byte{
[]byte(`{"json_type":"{\"foo\": \"bar\"}"}`),
[]byte(`{"json_type":"{\"key\": \"value\"}"}`),
[]byte(`{"json_type":"\"a string\""}`),
}

var result *AppendResult
for k, v := range sampleJSONData {
message := dynamicpb.NewMessage(md)

// First, json->proto message
err = protojson.Unmarshal(v, message)
if err != nil {
t.Fatalf("failed to Unmarshal json message for row %d: %v", k, err)
}
// Then, proto message -> bytes.
b, err := proto.Marshal(message)
if err != nil {
t.Fatalf("failed to marshal proto bytes for row %d: %v", k, err)
}
result, err = ms.AppendRows(ctx, [][]byte{b})
if err != nil {
t.Errorf("single-row append %d failed: %v", k, err)
}
}

// Wait for the result to indicate ready, then validate.
o, err := result.GetResult(ctx)
if err != nil {
t.Errorf("result error for last send: %v", err)
}
if o != NoStreamOffset {
t.Errorf("offset mismatch, got %d want %d", o, NoStreamOffset)
}
validateTableConstraints(ctx, t, bqClient, testTable, "after send",
withExactRowCount(int64(len(sampleJSONData))))
}

func testBufferedStream(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.SimpleMessageSchema}); err != nil {
Expand Down Expand Up @@ -1389,6 +1450,7 @@ func TestIntegration_ProtoNormalization(t *testing.T) {
t.Run("ComplexType", func(t *testing.T) {
t.Parallel()
schema := testdata.ComplexTypeSchema
jsonData := "{\"foo\": \"bar\"}"
mesg := &testdata.ComplexType{
NestedRepeatedType: []*testdata.NestedType{
{
Expand All @@ -1404,6 +1466,7 @@ func TestIntegration_ProtoNormalization(t *testing.T) {
RangeType: &testdata.RangeTypeTimestamp{
End: proto.Int64(time.Now().UnixNano()),
},
JsonType: &jsonData,
}
b, err := proto.Marshal(mesg)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions bigquery/storage/managedwriter/testdata/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ var (
Type: bigquery.TimestampFieldType,
},
},
{
Name: "json_type",
Type: bigquery.JSONFieldType,
},
}

// We currently follow proto2 rules here, hence the well known types getting treated as records.
Expand Down
Loading

0 comments on commit 978d4a1

Please sign in to comment.