Skip to content

Commit

Permalink
[v2] Add event size limit (elastic#1352)
Browse files Browse the repository at this point in the history
Default limit is set to 300kb.
  • Loading branch information
roncohen authored and Ron cohen committed Oct 16, 2018
1 parent f1937f5 commit 757182c
Show file tree
Hide file tree
Showing 25 changed files with 470 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"accepted": 1,
"errors": [
{
"document": "{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcdef\", \"parent_id\": \"abcdefabcdef01234567\", \"type\": \"request\", \"duration\": 32.592981, \"span_count\": { \"started\": 21 } } } \n",
"document": "{ \"transaction\": { \"id\": 12345, \"trace_id\": \"0123456789abcdef0123456789abcdef\", \"parent_id\": \"abcdefabcdef01234567\", \"type\": \"request\", \"duration\": 32.592981, \"span_count\": { \"started\": 21 } } } ",
"message": "Problem validating JSON document against schema: I[#] S[#] doesn't validate with \"transaction#\"\n I[#] S[#/allOf/1] allOf failed\n I[#/id] S[#/allOf/1/properties/id/type] expected string, but got number"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"accepted": 1,
"errors": [
{
"document": "{ \"invalid-json\" }\n",
"document": "{ \"invalid-json\" }",
"message": "data read error: invalid character '}' after object key"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"accepted": 0,
"errors": [
{
"document": "{\"metadata\": {\"invalid-json\"}}\n",
"document": "{\"metadata\": {\"invalid-json\"}}",
"message": "data read error: invalid character '}' after object key"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"accepted": 0,
"errors": [
{
"document": "{\"metadata\": {\"user\": null}}\n",
"document": "{\"metadata\": {\"user\": null}}",
"message": "Problem validating JSON document against schema: I[#] S[#] doesn't validate with \"metadata#\"\n I[#] S[#/required] missing properties: \"service\""
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"accepted": 0,
"errors": [
{
"document": "{\"not\": \"metadata\"}\n",
"document": "{\"not\": \"metadata\"}",
"message": "did not recognize object type"
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"accepted": 0,
"errors": [
{
"document": "{\"tennis-court\": {\"name\": \"Centre Court, Wimbledon\"}}\n",
"document": "{\"tennis-court\": {\"name\": \"Centre Court, Wimbledon\"}}",
"message": "did not recognize object type"
}
]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"accepted": 0,
"errors": [
{
"document": "{\"metadata\": {\"user\"",
"message": "event exceeded the permitted size."
}
]
}
3 changes: 3 additions & 0 deletions beater/beater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestBeatConfig(t *testing.T) {
"max_unzipped_size": 64,
"max_request_queue_time": 9 * time.Second,
"max_header_size": 8,
"max_event_size": 100,
"read_timeout": 3 * time.Second,
"write_timeout": 4 * time.Second,
"shutdown_timeout": 9 * time.Second,
Expand Down Expand Up @@ -115,6 +116,7 @@ func TestBeatConfig(t *testing.T) {
MaxUnzippedSize: 64,
MaxRequestQueueTime: 9 * time.Second,
MaxHeaderSize: 8,
MaxEventSize: 100,
ReadTimeout: 3000000000,
WriteTimeout: 4000000000,
ShutdownTimeout: 9000000000,
Expand Down Expand Up @@ -208,6 +210,7 @@ func TestBeatConfig(t *testing.T) {
MaxUnzippedSize: 64,
MaxRequestQueueTime: 2 * time.Second,
MaxHeaderSize: 1048576,
MaxEventSize: 307200,
ReadTimeout: 30000000000,
WriteTimeout: 30000000000,
ShutdownTimeout: 5000000000,
Expand Down
2 changes: 2 additions & 0 deletions beater/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Config struct {
MaxHeaderSize int `config:"max_header_size"`
ReadTimeout time.Duration `config:"read_timeout"`
WriteTimeout time.Duration `config:"write_timeout"`
MaxEventSize int `config:"max_event_size"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
SecretToken string `config:"secret_token"`
SSL *SSLConfig `config:"ssl"`
Expand Down Expand Up @@ -235,6 +236,7 @@ func defaultConfig(beatVersion string) *Config {
MaxRequestQueueTime: 2 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxEventSize: 300 * 1024, // 300 kb
ShutdownTimeout: 5 * time.Second,
SecretToken: "",
AugmentEnabled: true,
Expand Down
10 changes: 5 additions & 5 deletions beater/v2_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type v2Handler struct {

func (v *v2Handler) statusCode(sr *stream.Result) int {
var code int
higestCode := http.StatusAccepted
highestCode := http.StatusAccepted
for _, err := range sr.Errors {
switch err.Type {
case stream.InvalidInputErrType:
Expand All @@ -56,11 +56,11 @@ func (v *v2Handler) statusCode(sr *stream.Result) int {
default:
code = http.StatusInternalServerError
}
if code > higestCode {
higestCode = code
if code > highestCode {
highestCode = code
}
}
return higestCode
return highestCode
}

func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr *stream.Result) {
Expand Down Expand Up @@ -88,7 +88,7 @@ func (v *v2Handler) sendResponse(logger *logp.Logger, w http.ResponseWriter, sr
func (v *v2Handler) Handle(beaterConfig *Config, report publish.Reporter) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger := requestLogger(r)
ndReader, err := decoder.NDJSONStreamDecodeCompressedWithLimit(r, beaterConfig.MaxUnzippedSize)
ndReader, err := decoder.NDJSONStreamDecodeCompressedWithLimit(r, beaterConfig.MaxEventSize)
if err != nil {
// if we can't set up the ndjsonreader,
// we won't be able to make sense of the body
Expand Down
46 changes: 45 additions & 1 deletion beater/v2_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func TestRequestDecoderError(t *testing.T) {
}

func TestRequestIntegration(t *testing.T) {

for _, test := range []struct {
name string
code int
Expand Down Expand Up @@ -137,3 +136,48 @@ func TestRequestIntegration(t *testing.T) {
})
}
}

func TestV2LineExceeded(t *testing.T) {
b, err := loader.LoadDataAsBytes("../testdata/intake-v2/transactions.ndjson")
require.NoError(t, err)

lineLimitExceededInTestData := func(lineLimit int) bool {
var limitExceeded bool
for _, l := range bytes.Split(b, []byte("\n")) {
if len(l) > lineLimit {
limitExceeded = true
break
}
}
return limitExceeded
}

req := httptest.NewRequest("POST", "/v2/intake", bytes.NewBuffer(b))
req.Header.Add("Content-Type", "application/x-ndjson")

w := httptest.NewRecorder()

report := func(ctx context.Context, p publish.PendingReq) error {
return nil
}

c := defaultConfig("7.0.0")
assert.False(t, lineLimitExceededInTestData(c.MaxEventSize))
handler := (&v2BackendRoute).Handler(c, report)
handler.ServeHTTP(w, req)

assert.Equal(t, http.StatusAccepted, w.Code, w.Body.String())
assert.Equal(t, 0, w.Body.Len())

c.MaxEventSize = 20
assert.True(t, lineLimitExceededInTestData(c.MaxEventSize))
handler = (&v2BackendRoute).Handler(c, report)

req = httptest.NewRequest("POST", "/v2/intake", bytes.NewBuffer(b))
req.Header.Add("Content-Type", "application/x-ndjson")
w = httptest.NewRecorder()

handler.ServeHTTP(w, req)
assert.Equal(t, http.StatusBadRequest, w.Code, w.Body.String())
tests.AssertApproveResult(t, "approved-stream-result/TestV2LineExceeded", w.Body.Bytes())
}
73 changes: 73 additions & 0 deletions decoder/line_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package decoder

import (
"bufio"
"io"

"github.com/pkg/errors"
)

var ErrLineTooLong = errors.New("Line exceeded permitted length")

// LineReader reads length-limited lines from streams using a limited amount of memory.
type LineReader struct {
reader io.Reader
br *bufio.Reader
maxLineLength int
skip bool
}

func NewLineReader(reader io.Reader, maxLineLength int) *LineReader {
return &LineReader{
reader: reader,
br: bufio.NewReaderSize(reader, maxLineLength),
maxLineLength: maxLineLength,
}
}

// ReadLine reads the next line from the given reader.
// If it encounters a line that is longer than `maxLineLength` it will
// return the first `maxLineLength` bytes with `ErrLineTooLong`. On the next
// call it will return the next line.
func (lr *LineReader) ReadLine() ([]byte, error) {
for {
prefix := false
line, err := lr.br.ReadSlice('\n')
if err == bufio.ErrBufferFull {
prefix = true
}

if !lr.skip {
if prefix {
lr.skip = true
return line[:lr.maxLineLength], ErrLineTooLong
}

if len(line) > 0 && line[len(line)-1] == '\n' {
line = line[:len(line)-1]
}
return line, err
} else if err == io.EOF {
return nil, io.EOF
} else if !prefix {
lr.skip = false
}
}
}
Loading

0 comments on commit 757182c

Please sign in to comment.