diff --git a/operator/builtin/output/google_cloud.go b/operator/builtin/output/google_cloud.go index 0237aa4e1..27f65088e 100644 --- a/operator/builtin/output/google_cloud.go +++ b/operator/builtin/output/google_cloud.go @@ -2,16 +2,17 @@ package output import ( "context" - "errors" "fmt" "io/ioutil" "net/url" + "reflect" "time" vkit "cloud.google.com/go/logging/apiv2" "github.com/golang/protobuf/ptypes" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/observiq/carbon/entry" + "github.com/observiq/carbon/errors" "github.com/observiq/carbon/internal/version" "github.com/observiq/carbon/operator" "github.com/observiq/carbon/operator/buffer" @@ -109,7 +110,7 @@ func (p *GoogleCloudOutput) Start() error { scope := "https://www.googleapis.com/auth/logging.write" switch { case p.credentials != "" && p.credentialsFile != "": - return errors.New("at most one of credentials or credentials_file can be configured") + return errors.NewError("at most one of credentials or credentials_file can be configured", "") case p.credentials != "": credentials, err = google.CredentialsFromJSON(context.Background(), []byte(p.credentials), scope) if err != nil { @@ -251,33 +252,40 @@ func (p *GoogleCloudOutput) createProtobufEntry(e *entry.Entry) (newEntry *logpb } newEntry.Severity = interpretSeverity(e.Severity) + err = setPayload(newEntry, e.Record) + if err != nil { + return nil, errors.Wrap(err, "set entry payload") + } + return newEntry, nil +} + +func setPayload(entry *logpb.LogEntry, record interface{}) (err error) { // Protect against the panic condition inside `jsonValueToStructValue` defer func() { if r := recover(); r != nil { - newEntry = nil err = fmt.Errorf(r.(string)) } }() - switch p := e.Record.(type) { + switch p := record.(type) { case string: - newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: p} + entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: p} case []byte: - newEntry.Payload = &logpb.LogEntry_TextPayload{TextPayload: string(p)} + entry.Payload = &logpb.LogEntry_TextPayload{TextPayload: string(p)} case map[string]interface{}: s := jsonMapToProtoStruct(p) - newEntry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} + entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: s} case map[string]string: fields := map[string]*structpb.Value{} for k, v := range p { fields[k] = jsonValueToStructValue(v) } - newEntry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: fields}} + entry.Payload = &logpb.LogEntry_JsonPayload{JsonPayload: &structpb.Struct{Fields: fields}} default: - return nil, fmt.Errorf("cannot convert record of type %T to a protobuf representation", e.Record) + return fmt.Errorf("cannot convert record of type %T to a protobuf representation", record) } - return newEntry, nil + return nil } func jsonMapToProtoStruct(m map[string]interface{}) *structpb.Struct { @@ -340,8 +348,79 @@ func jsonValueToStructValue(v interface{}) *structpb.Value { vals = append(vals, jsonValueToStructValue(e)) } return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}} + case []string: + var vals []*structpb.Value + for _, e := range x { + vals = append(vals, jsonValueToStructValue(e)) + } + return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: vals}}} + default: + // Fallback to reflection for other types + return reflectToValue(reflect.ValueOf(v)) + } +} + +func reflectToValue(v reflect.Value) *structpb.Value { + switch v.Kind() { + case reflect.Bool: + return &structpb.Value{Kind: &structpb.Value_BoolValue{BoolValue: v.Bool()}} + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(v.Int())}} + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(v.Uint())}} + case reflect.Float32, reflect.Float64: + return &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: v.Float()}} + case reflect.Ptr: + if v.IsNil() { + return nil + } + return reflectToValue(reflect.Indirect(v)) + case reflect.Array, reflect.Slice: + size := v.Len() + if size == 0 { + return nil + } + values := make([]*structpb.Value, size) + for i := 0; i < size; i++ { + values[i] = reflectToValue(v.Index(i)) + } + return &structpb.Value{Kind: &structpb.Value_ListValue{ListValue: &structpb.ListValue{Values: values}}} + case reflect.Struct: + t := v.Type() + size := v.NumField() + if size == 0 { + return nil + } + fields := make(map[string]*structpb.Value, size) + for i := 0; i < size; i++ { + name := t.Field(i).Name + // Better way? + if len(name) > 0 && 'A' <= name[0] && name[0] <= 'Z' { + fields[name] = reflectToValue(v.Field(i)) + } + } + if len(fields) == 0 { + return nil + } + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} + case reflect.Map: + keys := v.MapKeys() + if len(keys) == 0 { + return nil + } + fields := make(map[string]*structpb.Value, len(keys)) + for _, k := range keys { + if k.Kind() == reflect.String { + fields[k.String()] = reflectToValue(v.MapIndex(k)) + } + } + if len(fields) == 0 { + return nil + } + return &structpb.Value{Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{Fields: fields}}} default: - panic(fmt.Sprintf("bad type %T for JSON value", v)) + // Last resort + return &structpb.Value{Kind: &structpb.Value_StringValue{StringValue: fmt.Sprint(v)}} } } diff --git a/operator/builtin/output/google_cloud_test.go b/operator/builtin/output/google_cloud_test.go index 2db35a660..579a00286 100644 --- a/operator/builtin/output/google_cloud_test.go +++ b/operator/builtin/output/google_cloud_test.go @@ -258,6 +258,84 @@ func googleCloudSeverityTestCase(s entry.Severity, expected sev.LogSeverity) goo } } +type googleCloudProtobufTest struct { + name string + record interface{} +} + +func (g *googleCloudProtobufTest) Run(t *testing.T) { + t.Run(g.name, func(t *testing.T) { + e := &logpb.LogEntry{} + err := setPayload(e, g.record) + require.NoError(t, err) + }) +} + +func TestGoogleCloudSetPayload(t *testing.T) { + cases := []googleCloudProtobufTest{ + { + "string", + "test", + }, + { + "[]byte", + []byte("test"), + }, + { + "map[string]string", + map[string]string{"test": "val"}, + }, + { + "Nested_[]string", + map[string]interface{}{ + "sub": []string{"1", "2"}, + }, + }, + { + "Nested_[]int", + map[string]interface{}{ + "sub": []int{1, 2}, + }, + }, + { + "Nested_uint32", + map[string]interface{}{ + "sub": uint32(32), + }, + }, + { + "DeepNested_map", + map[string]interface{}{ + "0": map[string]map[string]map[string]string{ + "1": {"2": {"3": "test"}}, + }, + }, + }, + { + "DeepNested_slice", + map[string]interface{}{ + "0": [][][]string{{{"0", "1"}}}, + }, + }, + { + "AnonymousStruct", + map[string]interface{}{ + "0": struct{ Field string }{Field: "test"}, + }, + }, + { + "NamedStruct", + map[string]interface{}{ + "0": time.Now(), + }, + }, + } + + for _, tc := range cases { + tc.Run(t) + } +} + // Adapted from https://github.com/googleapis/google-cloud-go/blob/master/internal/testutil/server.go type loggingHandler struct { logpb.LoggingServiceV2Server