Skip to content

Commit

Permalink
Merge pull request #9 from katcipis/supportNonJSON
Browse files Browse the repository at this point in the history
Support non JSON data on stream/list of docs
  • Loading branch information
katcipis authored Sep 29, 2020
2 parents 523733d + 0b8730e commit 5f93676
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 52 deletions.
28 changes: 20 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,22 @@ go install -i github.com/katcipis/jtoh/cmd/jtoh
# What

jtoh will produce a newline for each JSON document found on the list/stream,
accepting a projection string as a parameter indicating which fields are going
accepting a selector string as a parameter indicating which fields are going
to be used to compose each newline and what is the separator between each field:

```
<source of JSON list> | jtoh "<sep>field1<sep>field2<sep>field3.name"
```

Where **<sep>** is the first character and will be considered the separator,
it is used to separate different field definitions and will also be used
it is used to separate different field selectors and will also be used
as the separator on the output, this:

```
<source of JSON list> | jtoh ":field1:field2"
```

Will generate an stream of outputs like:
Will generate an stream of outputs like this:

```
data1:data2
Expand Down Expand Up @@ -117,11 +117,10 @@ You will probably have a long list of something like this:
}
```

In this case the application does no JSON structured logging
(which is perfectly fine in some scenarios),
but there is a lot of data around the
In this case the application does no JSON structured logging,
there is a lot of data around the
actual application log that can be useful for filtering but after
being used for filtering is pure cognitive noise.
being used for filtering it is pure cognitive noise.

Using jtoh like this:

Expand All @@ -140,5 +139,18 @@ get when the application structure the log entries as JSON and you get the
logs directly from Kubernetes using kubectl like this:

```
TODO
TODO: Kubernetes examples :-)
```

# Error Handling

One thing that makes jtoh very different than usual JSON parsing tools is
how it handles errors. Anything that is not JSON will be just echoed back
and it will keep trying to parse the rest of the data.

The idea is to cover scenarios where application have hybrid logs, where
sometimes it is JSON and sometimes it is just a stack trace or something
else. These scenarios are not ideal, the software should be fixed, but
life is not ideal, so if you are in this situation jtoh may help you
analyze the logs :-) (and hopefully in time you will also fix the logs
so they become uniform/consistent).
126 changes: 97 additions & 29 deletions jtoh.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package jtoh

import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"strings"
)

Expand Down Expand Up @@ -49,39 +51,66 @@ func New(s string) (J, error) {
}

// Do receives a json stream as input and transforms it
// in simple lines of text (newline-delimited) which is
// in lines of text (newline-delimited) which is
// then written in the provided writer.
//
// This function will block until all data is read from the input
// and written on the output.
func (j J) Do(jsonInput io.Reader, textOutput io.Writer) {
func (j J) Do(jsonInput io.Reader, linesOutput io.Writer) {
jsonInput, ok := isList(jsonInput)
dec := json.NewDecoder(jsonInput)
// Why not bufio ? what we need here is kinda like
// buffered io, but not exactly the same (was not able to
// come up with a better name to it).
bufinput := bufferedReader{r: jsonInput}
dec := json.NewDecoder(&bufinput)

if ok {
// WHY: To handle properly gigantic lists of JSON objs
// Really don't need the return value, but linters can be annoying =P
_, _ = dec.Token()
bufinput.reset()
}

for dec.More() {
m := map[string]interface{}{}
err := dec.Decode(&m)
if err != nil {
// TODO: handle non disruptive parse errors
// Ideally we want the original non-JSON data
// Will need some form of extended reader that remembers
// part of the read data (not all, don't want O(N) spatial
// complexity).
fmt.Fprintf(textOutput, "TODO:HANDLERR:%v\n", err)
return
var errBuffer []byte

// TODO: Right now we have space complexity O(N) when the input is not JSON
// For huge chunks of non JSON data this may be a problem
for bufinput.hasData() {
for dec.More() {
m := map[string]interface{}{}
err := dec.Decode(&m)
dataUsedOnDecode := bufinput.readBuffer()
bufinput.reset()

if err != nil {
errBuffer = append(errBuffer, dataUsedOnDecode...)
dec = json.NewDecoder(&bufinput)
continue
}

writeErrs(linesOutput, errBuffer)
errBuffer = nil

fieldValues := make([]string, len(j.fieldSelectors))
for i, fieldSelector := range j.fieldSelectors {
fieldValues[i] = selectField(fieldSelector, m)
}
fmt.Fprint(linesOutput, strings.Join(fieldValues, j.separator)+"\n")
}
dec = json.NewDecoder(&bufinput)
}

fieldValues := make([]string, len(j.fieldSelectors))
for i, fieldSelector := range j.fieldSelectors {
fieldValues[i] = selectField(fieldSelector, m)
}
fmt.Fprint(textOutput, strings.Join(fieldValues, j.separator)+"\n")
writeErrs(linesOutput, errBuffer)
}

func writeErrs(w io.Writer, errBuffer []byte) {
if len(errBuffer) == 0 {
return
}
errBuffer = append(errBuffer, '\n')
n, err := w.Write(errBuffer)
if err != nil {
fmt.Fprintf(os.Stderr, "jtoh:error writing error buffer: wrote %d bytes, details: %v\n", n, err)
}
}

Expand Down Expand Up @@ -145,16 +174,8 @@ func isList(jsons io.Reader) (io.Reader, bool) {
continue
}

if firstToken == '[' {
return io.MultiReader(strings.NewReader("["), jsons), true
}

if firstToken == '{' {
return io.MultiReader(strings.NewReader("{"), jsons), false
}

// FIXME: Probably would be better to fail here with a more clear error =P
return jsons, false
isList := firstToken == '['
return io.MultiReader(bytes.NewBuffer([]byte{firstToken}), jsons), isList
}
}

Expand All @@ -173,3 +194,50 @@ func trimSpaces(s []string) []string {
}
return trimmed
}

// bufferedReader is not exactly like the bufio on stdlib.
// The idea is to use it as a means to buffer read data
// until reset is called. We need this so when
// the JSON decoder finds an error in the stream we can retrieve
// exactly how much has been read between the last successful
// decode and the current error and echo it.
//
// To guarantee that we provide data byte per byte, which is
// not terribly efficient but was the only way so far to be sure
// (assuming that the json decoder does no lookahead) that when
// an error occurs on the json decoder we have the exact byte stream that
// caused the error (I would welcome with open arms a better solution x_x).
type bufferedReader struct {
r io.Reader
buffer []byte
readErr error
}

func (b *bufferedReader) Read(data []byte) (int, error) {
if len(data) == 0 {
return 0, nil
}

data = data[:1]
n, err := b.r.Read(data)

b.readErr = err

if n > 0 {
b.buffer = append(b.buffer, data[0])
}

return n, err
}

func (b *bufferedReader) hasData() bool {
return b.readErr == nil
}

func (b *bufferedReader) readBuffer() []byte {
return b.buffer
}

func (b *bufferedReader) reset() {
b.buffer = make([]byte, 0, 1024)
}
Loading

0 comments on commit 5f93676

Please sign in to comment.