Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

114 flatten nested objects #138

Merged
merged 20 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a42a00c
wip
KevRiver Nov 27, 2024
0c89feb
test(scanner): make a test that reproduces values of KV are wrapped b…
KevRiver Nov 28, 2024
9e7e61c
fix(json_handler): remove fmt.Sprintf("%q") and just assign the value
KevRiver Nov 28, 2024
ec37aec
wip(flatten nested object)
KevRiver Nov 28, 2024
150f49a
chore(scanner test): modify order of require.Equal arguments
KevRiver Dec 3, 2024
b977dd1
test(json handler test): add large number handling test
KevRiver Dec 3, 2024
40085c6
fix(json handler): fix to handle number fields with json.Number
KevRiver Dec 4, 2024
a2d6c7f
test(flattend nested objects): compare key-value pairs
KevRiver Dec 4, 2024
1388f56
Merge branch 'master' into 114-flatten-nested-objects
KevRiver Dec 4, 2024
d6e635e
test(flattening nested objects): rename test function names more clearly
KevRiver Dec 4, 2024
68745b7
fix(json handler): set json handler to wrap the string value with dou…
KevRiver Dec 4, 2024
75126c6
test(flattening nested objects): adjust expected result
KevRiver Dec 4, 2024
8787147
test(e2e): adjust expecting result
KevRiver Dec 4, 2024
9bcedec
test(flattening nested objects/arrays): add a test that fails
KevRiver Dec 9, 2024
4e152b0
test(flattening nested objects/array): add a test that fails with han…
KevRiver Dec 9, 2024
7983ffc
test(json handler/array fields): add a test that fails to get flatten…
KevRiver Dec 9, 2024
99d5f69
test(json handler/nested array fields): add a test that fails to get …
KevRiver Dec 9, 2024
bf6f044
feat(json handler/flattened array fields): modify json handler to be …
KevRiver Dec 9, 2024
da5997c
feat(json handler): add the default case
KevRiver Dec 9, 2024
05f3d69
Merge branch 'master' into 114-flatten-nested-objects
KevRiver Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions json_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -110,6 +111,11 @@ func getFlattenedFields(v map[string]interface{}) map[string]string {
extValues[key] = valTyped.String()
case string:
extValues[key] = fmt.Sprintf("%q", valTyped)
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(valTyped)
for k, v := range flattenedArrayFields {
extValues[key+"."+k] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(valTyped)
for keyNested, valStr := range flattenedFields {
Expand All @@ -122,6 +128,37 @@ func getFlattenedFields(v map[string]interface{}) map[string]string {
return extValues
}

func getFlattenedArrayFields(data []interface{}) map[string]string {
flattened := make(map[string]string)
for i, v := range data {
switch vt := v.(type) {
case json.Number:
if z, err := vt.Int64(); err == nil {
flattened[strconv.Itoa(i)] = fmt.Sprintf("%d", z)
} else if f, err := vt.Float64(); err == nil {
flattened[strconv.Itoa(i)] = fmt.Sprintf("%g", f)
} else {
flattened[strconv.Itoa(i)] = vt.String()
}
case string:
flattened[strconv.Itoa(i)] = vt
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(vt)
for k, v := range flattenedArrayFields {
flattened[fmt.Sprintf("%d.%s", i, k)] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(vt)
for k, v := range flattenedFields {
flattened[fmt.Sprintf("%d.%s", i, k)] = v
}
default:
flattened[strconv.Itoa(i)] = fmt.Sprintf("%v", vt)
}
}
return flattened
}

// UnmarshalJSON sets the fields of the handler.
func (h *JSONHandler) UnmarshalJSON(data []byte) bool {

Expand Down Expand Up @@ -183,6 +220,11 @@ func (h *JSONHandler) UnmarshalJSON(data []byte) bool {
h.Fields[key] = v.String()
case string:
h.Fields[key] = fmt.Sprintf("%q", v)
case []interface{}:
flattenedArrayFields := getFlattenedArrayFields(v)
for k, v := range flattenedArrayFields {
h.Fields[key+"."+k] = v
}
case map[string]interface{}:
flattenedFields := getFlattenedFields(v)
for keyNested, val := range flattenedFields {
Expand Down
34 changes: 34 additions & 0 deletions json_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,37 @@ func TestJsonHandler_TryHandle_LargeNumbers(t *testing.T) {
require.Equal(t, "1.2345", h.Fields["storage.some.float"])
require.Equal(t, "1730187806608637000", h.Fields["storage.session.id"])
}

func TestJsonHandler_TryHandle_FlattendArrayFields(t *testing.T) {
handler := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()}
ev := new(typesv1.StructuredLogEvent)
raw := []byte(`{"peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}],"storage":{"session.id":1730187806608637000, "some": {"float": 1.2345}}}`)
if !handler.TryHandle(raw, ev) {
t.Fatalf("failed to handle log")
}
require.Equal(t, "\"10.244.0.126:8083\"", handler.Fields["peers.0.ID"])
require.Equal(t, "\"10.244.0.126:8083\"", handler.Fields["peers.0.URI"])
require.Equal(t, "\"10.244.0.206:8083\"", handler.Fields["peers.1.ID"])
require.Equal(t, "\"10.244.0.206:8083\"", handler.Fields["peers.1.URI"])
require.Equal(t, "\"10.244.1.150:8083\"", handler.Fields["peers.2.ID"])
require.Equal(t, "\"10.244.1.150:8083\"", handler.Fields["peers.2.URI"])
}

func TestJsonHandler_TryHandle_FlattenedArrayFields_NestedArray(t *testing.T) {
handler := humanlog.JSONHandler{Opts: humanlog.DefaultOptions()}
ev := new(typesv1.StructuredLogEvent)
raw := []byte(`{"peers":[[1,2,3.14],[4,50.55,[6,7]],["hello","world"],{"foo":"bar"}]}`)
if !handler.TryHandle(raw, ev) {
t.Fatalf("failed to handle log")
}
require.Equal(t, "1", handler.Fields["peers.0.0"])
require.Equal(t, "2", handler.Fields["peers.0.1"])
require.Equal(t, "3.14", handler.Fields["peers.0.2"])
require.Equal(t, "4", handler.Fields["peers.1.0"])
require.Equal(t, "50.55", handler.Fields["peers.1.1"])
require.Equal(t, "6", handler.Fields["peers.1.2.0"])
require.Equal(t, "7", handler.Fields["peers.1.2.1"])
require.Equal(t, "hello", handler.Fields["peers.2.0"])
require.Equal(t, "world", handler.Fields["peers.2.1"])
require.Equal(t, "\"bar\"", handler.Fields["peers.3.foo"])
}
190 changes: 190 additions & 0 deletions scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,196 @@ func TestFlatteningNestedObjects_simple(t *testing.T) {
}
}

func TestFlatteningNestedObjects_with_arrays(t *testing.T) {
payload := `{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","source":{"function":"main.realMain.func5.1","file":"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go","line":407},"msg":"galaxycache peers updated","selfURI":"10.244.0.126:8083","peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]}`

now := time.Date(2024, 12, 9, 0, 0, 0, 0, time.UTC)
want := []*typesv1.LogEvent{
{
ParsedAt: timestamppb.New(now),
Raw: []byte(`{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","source":{"function":"main.realMain.func5.1","file":"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go","line":407},"msg":"galaxycache peers updated","selfURI":"10.244.0.126:8083","peers":[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]}`),
Structured: &typesv1.StructuredLogEvent{
Timestamp: timestamppb.New(time.Date(2024, 12, 5, 6, 40, 35, 247902137, time.UTC)),
Lvl: "DEBUG",
Msg: "galaxycache peers updated",
Kvs: []*typesv1.KV{
{
Key: "selfURI",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "source.function",
Value: "\"main.realMain.func5.1\"",
},
{
Key: "source.file",
Value: "\"github.com/humanlogio/apisvc/cmd/apisvc/server_cmd.go\"",
},
{
Key: "source.line",
Value: "407",
},
{
Key: "peers.0.ID",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "peers.0.URI",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "peers.1.ID",
Value: "\"10.244.0.206:8083\"",
},
{
Key: "peers.1.URI",
Value: "\"10.244.0.206:8083\"",
},
{
Key: "peers.2.ID",
Value: "\"10.244.1.150:8083\"",
},
{
Key: "peers.2.URI",
Value: "\"10.244.1.150:8083\"",
},
},
},
},
}

src := strings.NewReader(payload)
opts := DefaultOptions()
opts.timeNow = func() time.Time {
return now
}

sink := bufsink.NewSizedBufferedSink(100, nil)
ctx := context.Background()
err := Scan(ctx, src, sink, opts)
require.NoError(t, err)

got := sink.Buffered
require.Equal(t, len(want), len(got)) // assume that there's no skipped log events

n := len(want)
for i := 0; i < n; i++ {
actualKvs := make(map[string]string)
for _, kv := range got[i].Structured.Kvs {
actualKvs[kv.Key] = kv.Value
}
expectedKvs := make(map[string]string)
for _, kv := range want[i].Structured.Kvs {
expectedKvs[kv.Key] = kv.Value
}
require.Equal(t, got[i].ParsedAt, want[i].ParsedAt)
require.Equal(t, got[i].Raw, want[i].Raw)
require.Equal(t, got[i].Structured.Timestamp, want[i].Structured.Timestamp)
require.Equal(t, got[i].Structured.Msg, want[i].Structured.Msg)
require.Equal(t, got[i].Structured.Lvl, want[i].Structured.Lvl)
require.Equal(t, expectedKvs, actualKvs)
}
}

func TestFlatteningNestedObjects_with_nested_arrays(t *testing.T) {
payload := `{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","msg":"galaxycache peers updated","peers":[[1,2,3],[4,5,6],[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]]}`

now := time.Date(2024, 12, 9, 0, 0, 0, 0, time.UTC)
want := []*typesv1.LogEvent{
{
ParsedAt: timestamppb.New(now),
Raw: []byte(`{"time":"2024-12-05T06:40:35.247902137Z","level":"DEBUG","msg":"galaxycache peers updated","peers":[[1,2,3],[4,5,6],[{"ID":"10.244.0.126:8083","URI":"10.244.0.126:8083"},{"ID":"10.244.0.206:8083","URI":"10.244.0.206:8083"},{"ID":"10.244.1.150:8083","URI":"10.244.1.150:8083"}]]}`),
Structured: &typesv1.StructuredLogEvent{
Timestamp: timestamppb.New(time.Date(2024, 12, 5, 6, 40, 35, 247902137, time.UTC)),
Lvl: "DEBUG",
Msg: "galaxycache peers updated",
Kvs: []*typesv1.KV{
{
Key: "peers.0.0",
Value: "1",
},
{
Key: "peers.0.1",
Value: "2",
},
{
Key: "peers.0.2",
Value: "3",
},
{
Key: "peers.1.0",
Value: "4",
},
{
Key: "peers.1.1",
Value: "5",
},
{
Key: "peers.1.2",
Value: "6",
},
{
Key: "peers.2.0.ID",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "peers.2.0.URI",
Value: "\"10.244.0.126:8083\"",
},
{
Key: "peers.2.1.ID",
Value: "\"10.244.0.206:8083\"",
},
{
Key: "peers.2.1.URI",
Value: "\"10.244.0.206:8083\"",
},
{
Key: "peers.2.2.ID",
Value: "\"10.244.1.150:8083\"",
},
{
Key: "peers.2.2.URI",
Value: "\"10.244.1.150:8083\"",
},
},
},
},
}

src := strings.NewReader(payload)
opts := DefaultOptions()
opts.timeNow = func() time.Time {
return now
}

sink := bufsink.NewSizedBufferedSink(100, nil)
ctx := context.Background()
err := Scan(ctx, src, sink, opts)
require.NoError(t, err)

got := sink.Buffered
require.Equal(t, len(want), len(got)) // assume that there's no skipped log events

n := len(want)
for i := 0; i < n; i++ {
actualKvs := make(map[string]string)
for _, kv := range got[i].Structured.Kvs {
actualKvs[kv.Key] = kv.Value
}
expectedKvs := make(map[string]string)
for _, kv := range want[i].Structured.Kvs {
expectedKvs[kv.Key] = kv.Value
}
require.Equal(t, got[i].ParsedAt, want[i].ParsedAt)
require.Equal(t, got[i].Raw, want[i].Raw)
require.Equal(t, got[i].Structured.Timestamp, want[i].Structured.Timestamp)
require.Equal(t, got[i].Structured.Msg, want[i].Structured.Msg)
require.Equal(t, got[i].Structured.Lvl, want[i].Structured.Lvl)
require.Equal(t, expectedKvs, actualKvs)
}
}

func pjsonslice[E proto.Message](m []E) string {
sb := strings.Builder{}
for _, e := range m {
Expand Down
Loading