Skip to content

Commit

Permalink
[filebeat][gcs] Added support for more mime types, offset tracking vi…
Browse files Browse the repository at this point in the history
…a cursor, automatic splitting at root level (elastic#34155)

* initial commit -m

* added support for more mime type, off set tracking via cursor & root level split func

* updated asciidoc

* updated NOTICE.txt

* updated PR accroding to suggetions

* optimised code blocks as per pr suggetions

* addressed linting issues

* updated with PR suggetions
  • Loading branch information
ShourieG authored Jan 23, 2023
1 parent cb8f4f7 commit 69ebd98
Show file tree
Hide file tree
Showing 18 changed files with 928 additions and 214 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]


*Filebeat*
- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for
automatic splitting at root level, if root level element is an array. {pull}34155[34155]
- [httpsjon] Improved error handling during pagination with chaining & split processor {pull}34127[34127]
- [Azure blob storage] Added support for more mime types & introduced offset tracking via cursor state. {pull}33981[33981]
- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568]
Expand Down
8 changes: 5 additions & 3 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ even though it can get expensive with dealing with a very large number of files.
describing said error.

[id="supported-types-gcs"]
NOTE: Currently only `JSON` is supported with respect to object/file formats. As for authentication types, we currently have support for
`json credential keys` and `credential files`. If a download for a file/object fails or gets interrupted, the download is retried for 2 times.
This is currently not user configurable.
NOTE: Currently only `JSON` and `NDJSON` are supported object/file formats. Objects/files may be also be gzip compressed.
"JSON credential keys" and "credential files" are supported authentication types.
If an array is present as the root object for an object/file, it is automatically split into individual objects and processed.
If a download for a file/object fails or gets interrupted, the download is retried for 2 times. This is currently not user configurable.


[id="basic-config-gcs"]
Expand Down Expand Up @@ -90,6 +91,7 @@ calls to the bucket list api if it exceeds the given value. Each iteration consi
"_id": "gcs-test-new-data_3.json-worker-1"
},
"log": {
"offset": 200,
"file": {
"path": "gs://gcs-test-new/data_3.json"
}
Expand Down
52 changes: 17 additions & 35 deletions x-pack/filebeat/input/gcs/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,75 +21,58 @@ var (
errUnsupportedType = errors.New("only JSON objects are accepted")
)

// httpReadJSON accepts json file data in the form of an io.Reader, decodes the json data and returns the decoded
// data in the form of an object represended by a map[string]interface{}. Along with the objectified json data,
// this function also returns the raw json message in binary format. The status value in the return parameter
// is the http status value and have the following values : -
// 1) - StatusNotAcceptable (406) if body is missing
// 2) - StatusBadRequest (400) if the decoding fails
// 3) - StatusOK (200) if the decoding is successful
func httpReadJSON(body io.Reader) (objs []mapstr.M, rawMessages []json.RawMessage, status int, err error) {
// decodeJSON accepts json file data in the form of an io.Reader, decodes the json data and returns the decoded
// data in the form of an object represended by a map[string]interface{}.
func decodeJSON(body io.Reader) ([]mapstr.M, error) {
if body == http.NoBody {
return nil, nil, http.StatusNotAcceptable, errBodyEmpty
return nil, errBodyEmpty
}
obj, rawMessage, err := decodeJSON(body)
if err != nil {
return nil, nil, http.StatusBadRequest, err
}
return obj, rawMessage, http.StatusOK, err
}

func decodeJSON(body io.Reader) ([]mapstr.M, []json.RawMessage, error) {
var objs []mapstr.M
var rawMessages []json.RawMessage
decoder := json.NewDecoder(body)
for decoder.More() {
var raw json.RawMessage
if err := decoder.Decode(&raw); err != nil {
if err == io.EOF { //nolint:errorlint // This will never be a wrapped error.
break
}
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
return nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
}

var obj interface{}
if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
return nil, fmt.Errorf("malformed JSON object at stream position %d: %w", decoder.InputOffset(), err)
}
switch v := obj.(type) {
case map[string]interface{}:
objs = append(objs, v)
rawMessages = append(rawMessages, raw)
case []interface{}:
nobjs, nrawMessages, err := decodeJSONArray(bytes.NewReader(raw), decoder.InputOffset())
nobjs, err := decodeJSONArray(bytes.NewReader(raw), decoder.InputOffset())
if err != nil {
return nil, nil, fmt.Errorf("recursive error %d: %w", decoder.InputOffset(), err)
return nil, fmt.Errorf("recursive error %d: %w", decoder.InputOffset(), err)
}
objs = append(objs, nobjs...)
rawMessages = append(rawMessages, nrawMessages...)
default:
return nil, nil, errUnsupportedType
return nil, errUnsupportedType
}
}
for i := range objs {
jsontransform.TransformNumbers(objs[i])
}
return objs, rawMessages, nil
return objs, nil
}

func decodeJSONArray(raw *bytes.Reader, parentOffset int64) ([]mapstr.M, []json.RawMessage, error) {
func decodeJSONArray(raw *bytes.Reader, parentOffset int64) ([]mapstr.M, error) {
var objs []mapstr.M
var rawMessages []json.RawMessage
dec := newJSONDecoder(raw)
token, err := dec.Token()
if err != nil {
if err == io.EOF { //nolint:errorlint // This will never be a wrapped error.
return nil, nil, nil
return nil, nil
}
return nil, nil, fmt.Errorf("failed reading JSON array: %w", err)
return nil, fmt.Errorf("failed reading JSON array: %w", err)
}
if token != json.Delim('[') {
return nil, nil, fmt.Errorf("malformed JSON array, not starting with delimiter [ at position: %d", parentOffset+dec.InputOffset())
return nil, fmt.Errorf("malformed JSON array, not starting with delimiter [ at position: %d", parentOffset+dec.InputOffset())
}

for dec.More() {
Expand All @@ -98,21 +81,20 @@ func decodeJSONArray(raw *bytes.Reader, parentOffset int64) ([]mapstr.M, []json.
if err == io.EOF { //nolint:errorlint // This will never be a wrapped error.
break
}
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", parentOffset+dec.InputOffset(), err)
return nil, fmt.Errorf("malformed JSON object at stream position %d: %w", parentOffset+dec.InputOffset(), err)
}

var obj interface{}
if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
return nil, nil, fmt.Errorf("malformed JSON object at stream position %d: %w", parentOffset+dec.InputOffset(), err)
return nil, fmt.Errorf("malformed JSON object at stream position %d: %w", parentOffset+dec.InputOffset(), err)
}

m, ok := obj.(map[string]interface{})
if ok {
rawMessages = append(rawMessages, raw)
objs = append(objs, m)
}
}
return objs, rawMessages, nil
return objs, nil
}

func newJSONDecoder(r io.Reader) *json.Decoder {
Expand Down
96 changes: 29 additions & 67 deletions x-pack/filebeat/input/gcs/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package gcs

import (
"encoding/json"
"net/http"
"strings"
"testing"

Expand All @@ -17,123 +15,87 @@ import (

func Test_httpReadJSON(t *testing.T) {
tests := []struct {
name string
body string
wantObjs []mapstr.M
wantStatus int
wantErr bool
wantRawMessage []json.RawMessage
name string
body string
wantObjs []mapstr.M
wantErr bool
}{
{
name: "single object",
body: `{"a": 42, "b": "c"}`,
wantObjs: []mapstr.M{{"a": int64(42), "b": "c"}},
wantStatus: http.StatusOK,
name: "single object",
body: `{"a": 42, "b": "c"}`,
wantObjs: []mapstr.M{{"a": int64(42), "b": "c"}},
},
{
name: "array accepted",
body: `[{"a":"b"},{"c":"d"}]`,
wantObjs: []mapstr.M{{"a": "b"}, {"c": "d"}},
wantStatus: http.StatusOK,
name: "array accepted",
body: `[{"a":"b"},{"c":"d"}]`,
wantObjs: []mapstr.M{{"a": "b"}, {"c": "d"}},
},
{
name: "not an object not accepted",
body: `42`,
wantStatus: http.StatusBadRequest,
wantErr: true,
name: "not an object not accepted",
body: `42`,
wantErr: true,
},
{
name: "not an object mixed",
body: `[{a:1},
42,
{a:2}]`,
wantStatus: http.StatusBadRequest,
wantErr: true,
wantErr: true,
},
{
name: "sequence of objects accepted (CRLF)",
body: `{"a":1}` + "\r" + `{"a":2}`,
wantObjs: []mapstr.M{{"a": int64(1)}, {"a": int64(2)}},
wantStatus: http.StatusOK,
name: "sequence of objects accepted (CRLF)",
body: `{"a":1}` + "\r" + `{"a":2}`,
wantObjs: []mapstr.M{{"a": int64(1)}, {"a": int64(2)}},
},
{
name: "sequence of objects accepted (LF)",
body: `{"a":"1"}
{"a":"2"}`,
wantRawMessage: []json.RawMessage{
[]byte(`{"a":"1"}`),
[]byte(`{"a":"2"}`),
},
wantObjs: []mapstr.M{{"a": "1"}, {"a": "2"}},
wantStatus: http.StatusOK,
wantObjs: []mapstr.M{{"a": "1"}, {"a": "2"}},
},
{
name: "sequence of objects accepted (SP)",
body: `{"a":"2"} {"a":"2"}`,
wantObjs: []mapstr.M{{"a": "2"}, {"a": "2"}},
wantStatus: http.StatusOK,
name: "sequence of objects accepted (SP)",
body: `{"a":"2"} {"a":"2"}`,
wantObjs: []mapstr.M{{"a": "2"}, {"a": "2"}},
},
{
name: "sequence of objects accepted (no separator)",
body: `{"a":"2"}{"a":"2"}`,
wantObjs: []mapstr.M{{"a": "2"}, {"a": "2"}},
wantStatus: http.StatusOK,
name: "sequence of objects accepted (no separator)",
body: `{"a":"2"}{"a":"2"}`,
wantObjs: []mapstr.M{{"a": "2"}, {"a": "2"}},
},
{
name: "not an object in sequence",
body: `{"a":"2"}
42
{"a":"2"}`,
wantStatus: http.StatusBadRequest,
wantErr: true,
wantErr: true,
},
{
name: "array of objects in stream",
body: `{"a":"1"} [{"a":"2"},{"a":"3"}] {"a":"4"}`,
wantRawMessage: []json.RawMessage{
[]byte(`{"a":"1"}`),
[]byte(`{"a":"2"}`),
[]byte(`{"a":"3"}`),
[]byte(`{"a":"4"}`),
},
wantObjs: []mapstr.M{{"a": "1"}, {"a": "2"}, {"a": "3"}, {"a": "4"}},
wantStatus: http.StatusOK,
name: "array of objects in stream",
body: `{"a":"1"} [{"a":"2"},{"a":"3"}] {"a":"4"}`,
wantObjs: []mapstr.M{{"a": "1"}, {"a": "2"}, {"a": "3"}, {"a": "4"}},
},
{
name: "numbers",
body: `{"a":1} [{"a":false},{"a":3.14}] {"a":-4}`,
wantRawMessage: []json.RawMessage{
[]byte(`{"a":1}`),
[]byte(`{"a":false}`),
[]byte(`{"a":3.14}`),
[]byte(`{"a":-4}`),
},
wantObjs: []mapstr.M{
{"a": int64(1)},
{"a": false},
{"a": 3.14},
{"a": int64(-4)},
},
wantStatus: http.StatusOK,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotObjs, rawMessages, gotStatus, err := httpReadJSON(strings.NewReader(tt.body))
gotObjs, err := decodeJSON(strings.NewReader(tt.body))
if (err != nil) != tt.wantErr {
t.Errorf("httpReadJSON() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !assert.EqualValues(t, tt.wantObjs, gotObjs) {
t.Errorf("httpReadJSON() gotObjs = %v, want %v", gotObjs, tt.wantObjs)
}
if gotStatus != tt.wantStatus {
t.Errorf("httpReadJSON() gotStatus = %v, want %v", gotStatus, tt.wantStatus)
}
if tt.wantRawMessage != nil {
assert.Equal(t, tt.wantRawMessage, rawMessages)
}
assert.Equal(t, len(gotObjs), len(rawMessages))
})
}
}
Loading

0 comments on commit 69ebd98

Please sign in to comment.