Skip to content

Commit

Permalink
Fallback to reflection for converting maps to proto structs
Browse files Browse the repository at this point in the history
  • Loading branch information
camdencheek committed Aug 14, 2020
1 parent d35ab06 commit bf65b6d
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 11 deletions.
101 changes: 90 additions & 11 deletions operator/builtin/output/google_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ package output

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/url"
"reflect"
"time"

vkit "cloud.google.com/go/logging/apiv2"
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/observiq/carbon/entry"
"github.com/observiq/carbon/errors"
"github.com/observiq/carbon/internal/version"
"github.com/observiq/carbon/operator"
"github.com/observiq/carbon/operator/buffer"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (p *GoogleCloudOutput) Start() error {
scope := "https://www.googleapis.com/auth/logging.write"
switch {
case p.credentials != "" && p.credentialsFile != "":
return errors.New("at most one of credentials or credentials_file can be configured")
return errors.NewError("at most one of credentials or credentials_file can be configured", "")
case p.credentials != "":
credentials, err = google.CredentialsFromJSON(context.Background(), []byte(p.credentials), scope)
if err != nil {
Expand Down Expand Up @@ -251,33 +252,40 @@ func (p *GoogleCloudOutput) createProtobufEntry(e *entry.Entry) (newEntry *logpb
}

newEntry.Severity = interpretSeverity(e.Severity)
err = setPayload(newEntry, e.Record)
if err != nil {
return nil, errors.Wrap(err, "set entry payload")
}

return newEntry, nil
}

func setPayload(entry *logpb.LogEntry, record interface{}) (err error) {
// Protect against the panic condition inside `jsonValueToStructValue`
defer func() {
if r := recover(); r != nil {
newEntry = nil
err = fmt.Errorf(r.(string))
}
}()
switch p := e.Record.(type) {
switch p := record.(type) {
case string:
newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: p}
entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: p}
case []byte:
newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: string(p)}
entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: string(p)}
case map[string]interface{}:
s := jsonMapToProtoStruct(p)
newEntry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s}
case map[string]string:
fields := map[string]*structpb.Value{}
for k, v := range p {
fields[k] = jsonValueToStructValue(v)
}
newEntry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: fields}}
entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: fields}}
default:
return nil, fmt.Errorf("cannot convert record of type %T to a protobuf representation", e.Record)
return fmt.Errorf("cannot convert record of type %T to a protobuf representation", record)
}

return newEntry, nil
return nil
}

func jsonMapToProtoStruct(m map[string]interface{}) *structpb.Struct {
Expand Down Expand Up @@ -340,8 +348,79 @@ func jsonValueToStructValue(v interface{}) *structpb.Value {
vals = append(vals, jsonValueToStructValue(e))
}
return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}}
case []string:
var vals []*structpb.Value
for _, e := range x {
vals = append(vals, jsonValueToStructValue(e))
}
return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}}
default:
// Fallback to reflection for other types
return reflectToValue(reflect.ValueOf(v))
}
}

func reflectToValue(v reflect.Value) *structpb.Value {
switch v.Kind() {
case reflect.Bool:
return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: v.Bool()}}
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(v.Int())}}
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(v.Uint())}}
case reflect.Float32, reflect.Float64:
return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: v.Float()}}
case reflect.Ptr:
if v.IsNil() {
return nil
}
return reflectToValue(reflect.Indirect(v))
case reflect.Array, reflect.Slice:
size := v.Len()
if size == 0 {
return nil
}
values := make([]*structpb.Value, size)
for i := 0; i < size; i++ {
values[i] = reflectToValue(v.Index(i))
}
return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: values}}}
case reflect.Struct:
t := v.Type()
size := v.NumField()
if size == 0 {
return nil
}
fields := make(map[string]*structpb.Value, size)
for i := 0; i < size; i++ {
name := t.Field(i).Name
// Better way?
if len(name) > 0 && 'A' <= name[0] && name[0] <= 'Z' {
fields[name] = reflectToValue(v.Field(i))
}
}
if len(fields) == 0 {
return nil
}
return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}}
case reflect.Map:
keys := v.MapKeys()
if len(keys) == 0 {
return nil
}
fields := make(map[string]*structpb.Value, len(keys))
for _, k := range keys {
if k.Kind() == reflect.String {
fields[k.String()] = reflectToValue(v.MapIndex(k))
}
}
if len(fields) == 0 {
return nil
}
return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}}
default:
panic(fmt.Sprintf("bad type %T for JSON value", v))
// Last resort
return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: fmt.Sprint(v)}}
}
}

Expand Down
78 changes: 78 additions & 0 deletions operator/builtin/output/google_cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,84 @@ func googleCloudSeverityTestCase(s entry.Severity, expected sev.LogSeverity) goo
}
}

type googleCloudProtobufTest struct {
name string
record interface{}
}

func (g *googleCloudProtobufTest) Run(t *testing.T) {
t.Run(g.name, func(t *testing.T) {
e := &logpb.LogEntry{}
err := setPayload(e, g.record)
require.NoError(t, err)
})
}

func TestGoogleCloudSetPayload(t *testing.T) {
cases := []googleCloudProtobufTest{
{
"string",
"test",
},
{
"[]byte",
[]byte("test"),
},
{
"map[string]string",
map[string]string{"test": "val"},
},
{
"Nested_[]string",
map[string]interface{}{
"sub": []string{"1", "2"},
},
},
{
"Nested_[]int",
map[string]interface{}{
"sub": []int{1, 2},
},
},
{
"Nested_uint32",
map[string]interface{}{
"sub": uint32(32),
},
},
{
"DeepNested_map",
map[string]interface{}{
"0": map[string]map[string]map[string]string{
"1": {"2": {"3": "test"}},
},
},
},
{
"DeepNested_slice",
map[string]interface{}{
"0": [][][]string{{{"0", "1"}}},
},
},
{
"AnonymousStruct",
map[string]interface{}{
"0": struct{ Field string }{Field: "test"},
},
},
{
"NamedStruct",
map[string]interface{}{
"0": time.Now(),
},
},
}

for _, tc := range cases {
tc.Run(t)
}
}

// Adapted from https://github.com/googleapis/google-cloud-go/blob/master/internal/testutil/server.go
type loggingHandler struct {
logpb.LoggingServiceV2Server
Expand Down

0 comments on commit bf65b6d

Please sign in to comment.