Skip to content

Commit

Permalink
114 flatten nested objects (humanlogio#138)
Browse files Browse the repository at this point in the history
* wip

* test(scanner): make a test that reproduces values of KV are wrapped by extra double quotations.

* fix(json_handler): remove fmt.Sprintf("%q") and just assign the value

* wip(flatten nested object)

* chore(scanner test): modify order of require.Equal arguments

* test(json handler test): add large number handling test

* fix(json handler): fix to handle number fields with json.Number

* test(flattend nested objects): compare key-value pairs

* test(flattening nested objects): rename test function names more clearly

* fix(json handler): set json handler to wrap the string value with double quotation again

* test(flattening nested objects): adjust expected result

* test(e2e): adjust expecting result

* test(flattening nested objects/arrays): add a test that fails

* test(flattening nested objects/array): add a test that fails with handle nested arrays

* test(json handler/array fields): add a test that fails to get flattened arrays

* test(json handler/nested array fields): add a test that fails to get flattened array fields

* feat(json handler/flattened array fields): modify json handler to be able to flatten the array fields

* feat(json handler): add the default case
KevRiver authored Dec 11, 2024
1 parent 4187e2a commit d50f608
Showing 3 changed files with 266 additions and 0 deletions.
42 changes: 42 additions & 0 deletions json_handler.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

@@ -110,6 +111,11 @@ func getFlattenedFields(v map[string]interface{}) map[string]string {
extValues[key] = valTyped.String()
case string:
extValues[key] = fmt.Sprintf("%q", valTyped)
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(valTyped)
for k, v := range flattenedArrayFields {
extValues[key+"."+k] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(valTyped)
for keyNested, valStr := range flattenedFields {
@@ -122,6 +128,37 @@ func getFlattenedFields(v map[string]interface{}) map[string]string {
return extValues
}

func getFlattenedArrayFields(data []interface{}) map[string]string {
flattened := make(map[string]string)
for i, v := range data {
switch vt := v.(type) {
case json.Number:
if z, err := vt.Int64(); err == nil {
flattened[strconv.Itoa(i)] = fmt.Sprintf("%d", z)
} else if f, err := vt.Float64(); err == nil {
flattened[strconv.Itoa(i)] = fmt.Sprintf("%g", f)
} else {
flattened[strconv.Itoa(i)] = vt.String()
}
case string:
flattened[strconv.Itoa(i)] = vt
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(vt)
for k, v := range flattenedArrayFields {
flattened[fmt.Sprintf("%d.%s", i, k)] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(vt)
for k, v := range flattenedFields {
flattened[fmt.Sprintf("%d.%s", i, k)] = v
}
default:
flattened[strconv.Itoa(i)] = fmt.Sprintf("%v", vt)
}
}
return flattened
}

// UnmarshalJSON sets the fields of the handler.
func (h *JSONHandler) UnmarshalJSON(data []byte) bool {

@@ -183,6 +220,11 @@ func (h *JSONHandler) UnmarshalJSON(data []byte) bool {
h.Fields[key] = v.String()
case string:
h.Fields[key] = fmt.Sprintf("%q", v)
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(v)
for k, v := range flattenedArrayFields {
h.Fields[key+"."+k] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(v)
for keyNested, val := range flattenedFields {
34 changes: 34 additions & 0 deletions json_handler_test.go
Original file line number Diff line number Diff line change
@@ -165,3 +165,37 @@ func TestJsonHandler_TryHandle_LargeNumbers(t *testing.T) {
require.Equal(t, "1.2345", h.Fields["storage.some.float"])
require.Equal(t, "1730187806608637000", h.Fields["storage.session.id"])
}

func TestJsonHandler_TryHandle_FlattendArrayFields(t *testing.T) {
handler := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()}
ev := new(typesv1.StructuredLogEvent)
raw := []byte(`{"peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}],"storage":{"session.id":1730187806608637000, "some": {"float": 1.2345}}}`)
if !handler.TryHandle(raw, ev) {
t.Fatalf("failed to handle log")
}
require.Equal(t, "\"10.244.0.126:8083\"", handler.Fields["peers.0.ID"])
require.Equal(t, "\"10.244.0.126:8083\"", handler.Fields["peers.0.URI"])
require.Equal(t, "\"10.244.0.206:8083\"", handler.Fields["peers.1.ID"])
require.Equal(t, "\"10.244.0.206:8083\"", handler.Fields["peers.1.URI"])
require.Equal(t, "\"10.244.1.150:8083\"", handler.Fields["peers.2.ID"])
require.Equal(t, "\"10.244.1.150:8083\"", handler.Fields["peers.2.URI"])
}

func TestJsonHandler_TryHandle_FlattenedArrayFields_NestedArray(t *testing.T) {
handler := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()}
ev := new(typesv1.StructuredLogEvent)
raw := []byte(`{"peers":[[1,2,3.14],[4,50.55,[6,7]],["hello","world"],{"foo":"bar"}]}`)
if !handler.TryHandle(raw, ev) {
t.Fatalf("failed to handle log")
}
require.Equal(t, "1", handler.Fields["peers.0.0"])
require.Equal(t, "2", handler.Fields["peers.0.1"])
require.Equal(t, "3.14", handler.Fields["peers.0.2"])
require.Equal(t, "4", handler.Fields["peers.1.0"])
require.Equal(t, "50.55", handler.Fields["peers.1.1"])
require.Equal(t, "6", handler.Fields["peers.1.2.0"])
require.Equal(t, "7", handler.Fields["peers.1.2.1"])
require.Equal(t, "hello", handler.Fields["peers.2.0"])
require.Equal(t, "world", handler.Fields["peers.2.1"])
require.Equal(t, "\"bar\"", handler.Fields["peers.3.foo"])
}
190 changes: 190 additions & 0 deletions scanner_test.go
Original file line number Diff line number Diff line change
@@ -204,6 +204,196 @@ func TestFlatteningNestedObjects_simple(t *testing.T) {
}
}

func TestFlatteningNestedObjects_with_arrays(t *testing.T) {
payload := `{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","source":{"function":"main.realMain.func5.1","file":"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go","line":407},"msg":"galaxycache peers updated","selfURI":"10.244.0.126:8083","peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]}`

now := time.Date(2024, 12, 9, 0, 0, 0, 0, time.UTC)
want := []*typesv1.LogEvent{
{
ParsedAt: timestamppb.New(now),
Raw: []byte(`{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","source":{"function":"main.realMain.func5.1","file":"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go","line":407},"msg":"galaxycache peers updated","selfURI":"10.244.0.126:8083","peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]}`),
Structured: &typesv1.StructuredLogEvent{
Timestamp: timestamppb.New(time.Date(2024, 12, 5, 6, 40, 35, 247902137, time.UTC)),
Lvl: "DEBUG",
Msg: "galaxycache peers updated",
Kvs: []*typesv1.KV{
{
Key: "selfURI",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "source.function",
Value: "\"main.realMain.func5.1\"",
},
{
Key: "source.file",
Value: "\"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go\"",
},
{
Key: "source.line",
Value: "407",
},
{
Key: "peers.0.ID",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "peers.0.URI",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "peers.1.ID",
Value: "\"10.244.0.206:8083\"",
},
{
Key: "peers.1.URI",
Value: "\"10.244.0.206:8083\"",
},
{
Key: "peers.2.ID",
Value: "\"10.244.1.150:8083\"",
},
{
Key: "peers.2.URI",
Value: "\"10.244.1.150:8083\"",
},
},
},
},
}

src := strings.NewReader(payload)
opts := DefaultOptions()
opts.timeNow = func() time.Time {
return now
}

sink := bufsink.NewSizedBufferedSink(100, nil)
ctx := context.Background()
err := Scan(ctx, src, sink, opts)
require.NoError(t, err)

got := sink.Buffered
require.Equal(t, len(want), len(got)) // assume that there's no skipped log events

n := len(want)
for i := 0; i < n; i++ {
actualKvs := make(map[string]string)
for _, kv := range got[i].Structured.Kvs {
actualKvs[kv.Key] = kv.Value
}
expectedKvs := make(map[string]string)
for _, kv := range want[i].Structured.Kvs {
expectedKvs[kv.Key] = kv.Value
}
require.Equal(t, got[i].ParsedAt, want[i].ParsedAt)
require.Equal(t, got[i].Raw, want[i].Raw)
require.Equal(t, got[i].Structured.Timestamp, want[i].Structured.Timestamp)
require.Equal(t, got[i].Structured.Msg, want[i].Structured.Msg)
require.Equal(t, got[i].Structured.Lvl, want[i].Structured.Lvl)
require.Equal(t, expectedKvs, actualKvs)
}
}

func TestFlatteningNestedObjects_with_nested_arrays(t *testing.T) {
payload := `{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","msg":"galaxycache peers updated","peers":[[1,2,3],[4,5,6],[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]]}`

now := time.Date(2024, 12, 9, 0, 0, 0, 0, time.UTC)
want := []*typesv1.LogEvent{
{
ParsedAt: timestamppb.New(now),
Raw: []byte(`{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","msg":"galaxycache peers updated","peers":[[1,2,3],[4,5,6],[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]]}`),
Structured: &typesv1.StructuredLogEvent{
Timestamp: timestamppb.New(time.Date(2024, 12, 5, 6, 40, 35, 247902137, time.UTC)),
Lvl: "DEBUG",
Msg: "galaxycache peers updated",
Kvs: []*typesv1.KV{
{
Key: "peers.0.0",
Value: "1",
},
{
Key: "peers.0.1",
Value: "2",
},
{
Key: "peers.0.2",
Value: "3",
},
{
Key: "peers.1.0",
Value: "4",
},
{
Key: "peers.1.1",
Value: "5",
},
{
Key: "peers.1.2",
Value: "6",
},
{
Key: "peers.2.0.ID",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "peers.2.0.URI",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "peers.2.1.ID",
Value: "\"10.244.0.206:8083\"",
},
{
Key: "peers.2.1.URI",
Value: "\"10.244.0.206:8083\"",
},
{
Key: "peers.2.2.ID",
Value: "\"10.244.1.150:8083\"",
},
{
Key: "peers.2.2.URI",
Value: "\"10.244.1.150:8083\"",
},
},
},
},
}

src := strings.NewReader(payload)
opts := DefaultOptions()
opts.timeNow = func() time.Time {
return now
}

sink := bufsink.NewSizedBufferedSink(100, nil)
ctx := context.Background()
err := Scan(ctx, src, sink, opts)
require.NoError(t, err)

got := sink.Buffered
require.Equal(t, len(want), len(got)) // assume that there's no skipped log events

n := len(want)
for i := 0; i < n; i++ {
actualKvs := make(map[string]string)
for _, kv := range got[i].Structured.Kvs {
actualKvs[kv.Key] = kv.Value
}
expectedKvs := make(map[string]string)
for _, kv := range want[i].Structured.Kvs {
expectedKvs[kv.Key] = kv.Value
}
require.Equal(t, got[i].ParsedAt, want[i].ParsedAt)
require.Equal(t, got[i].Raw, want[i].Raw)
require.Equal(t, got[i].Structured.Timestamp, want[i].Structured.Timestamp)
require.Equal(t, got[i].Structured.Msg, want[i].Structured.Msg)
require.Equal(t, got[i].Structured.Lvl, want[i].Structured.Lvl)
require.Equal(t, expectedKvs, actualKvs)
}
}

func pjsonslice[E proto.Message](m []E) string {
sb := strings.Builder{}
for _, e := range m {

0 comments on commit d50f608

Please sign in to comment.