Skip to content

Commit

Permalink
refactoring to eliminate unnecessary copy && add flag
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Apr 1, 2019
1 parent 40d5258 commit 991a4da
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 21 deletions.
46 changes: 29 additions & 17 deletions libbeat/processors/actions/truncate_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type truncateFields struct {
truncate truncater
}

type truncater func(*truncateFields, []byte) ([]byte, error)
type truncater func(*truncateFields, []byte) ([]byte, bool, error)

func init() {
processors.RegisterPlugin("truncate_fields",
Expand Down Expand Up @@ -99,80 +99,92 @@ func (f *truncateFields) truncateSingleField(field string, event *beat.Event) (*
if f.config.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound {
return event, nil
}
return event, fmt.Errorf("could not fetch value for key: %s, Error: %+v", field, err)
return event, errors.Wrapf(err, "could not fetch value for key: %s", field)
}

switch value := v.(type) {
case string:
return f.addTruncatedString(field, value, event)
case []byte:
return f.addTruncatedByte(field, value, event)
case string:
return f.addTruncatedString(field, value, event)
default:
return event, fmt.Errorf("value cannot be truncated: %+v", value)
}

}

func (f *truncateFields) addTruncatedString(field, value string, event *beat.Event) (*beat.Event, error) {
truncated, err := f.truncate(f, []byte(value))
truncated, isTruncated, err := f.truncate(f, []byte(value))
if err != nil {
return event, err
}
_, err = event.PutValue(field, string(truncated))
if err != nil {
return event, fmt.Errorf("could not add truncated string value for key: %s, Error: %+v", field, err)
}

if isTruncated {
common.AddTagsWithKey(event.Fields, "log.flags", []string{"truncated"})
}

return event, nil
}

func (f *truncateFields) addTruncatedByte(field string, value []byte, event *beat.Event) (*beat.Event, error) {
truncated, err := f.truncate(f, value)
truncated, isTruncated, err := f.truncate(f, value)
if err != nil {
return event, err
}
_, err = event.PutValue(field, truncated)
if err != nil {
return event, fmt.Errorf("could not add truncated byte slice value for key: %s, Error: %+v", field, err)
}

if isTruncated {
common.AddTagsWithKey(event.Fields, "log.flags", []string{"truncated"})
}

return event, nil
}

func (f *truncateFields) truncateBytes(value []byte) ([]byte, error) {
func (f *truncateFields) truncateBytes(value []byte) ([]byte, bool, error) {
size := len(value)
if size > f.config.MaxBytes {
size = f.config.MaxBytes
if size <= f.config.MaxBytes {
return value, false, nil
}

size = f.config.MaxBytes
truncated := make([]byte, size)
n := copy(truncated, value[:size])
if n != size {
return nil, fmt.Errorf("unexpected number of bytes were copied")
return nil, false, fmt.Errorf("unexpected number of bytes were copied")
}
return truncated, nil
return truncated, true, nil
}

func (f *truncateFields) truncateCharacters(value []byte) ([]byte, error) {
func (f *truncateFields) truncateCharacters(value []byte) ([]byte, bool, error) {
count := utf8.RuneCount(value)
if count > f.config.MaxChars {
count = f.config.MaxChars
if count <= f.config.MaxChars {
return value, false, nil
}

count = f.config.MaxChars
r := bytes.NewReader(value)
w := bytes.NewBuffer(nil)

for i := 0; i < count; i++ {
r, _, err := r.ReadRune()
if err != nil {
return nil, err
return nil, false, err
}

_, err = w.WriteRune(r)
if err != nil {
return nil, err
return nil, false, err
}
}

return w.Bytes(), nil
return w.Bytes(), true, nil
}

func (f *truncateFields) String() string {
Expand Down
20 changes: 16 additions & 4 deletions libbeat/processors/actions/truncate_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestTruncateFields(t *testing.T) {

var tests = map[string]struct {
MaxBytes int
MaxCount int
MaxChars int
Input common.MapStr
Output common.MapStr
ShouldError bool
Expand All @@ -43,6 +43,9 @@ func TestTruncateFields(t *testing.T) {
},
Output: common.MapStr{
"message": "too",
"log": common.MapStr{
"flags": []string{"truncated"},
},
},
ShouldError: false,
TruncateFunc: (*truncateFields).truncateBytes,
Expand All @@ -54,6 +57,9 @@ func TestTruncateFields(t *testing.T) {
},
Output: common.MapStr{
"message": []byte("too"),
"log": common.MapStr{
"flags": []string{"truncated"},
},
},
ShouldError: false,
TruncateFunc: (*truncateFields).truncateBytes,
Expand Down Expand Up @@ -92,7 +98,7 @@ func TestTruncateFields(t *testing.T) {
TruncateFunc: (*truncateFields).truncateBytes,
},
"do not truncate characters of short byte line": {
MaxCount: 6,
MaxChars: 6,
Input: common.MapStr{
"message": []byte("ez jó"), // this is good (hungarian)
},
Expand All @@ -114,12 +120,15 @@ func TestTruncateFields(t *testing.T) {
TruncateFunc: (*truncateFields).truncateBytes,
},
"truncate characters of too long byte line": {
MaxCount: 10,
MaxChars: 10,
Input: common.MapStr{
"message": []byte("ez egy túl hosszú sor"), // this is a too long line (hungarian)
},
Output: common.MapStr{
"message": []byte("ez egy túl"), // this is a too (hungarian)
"log": common.MapStr{
"flags": []string{"truncated"},
},
},
ShouldError: false,
TruncateFunc: (*truncateFields).truncateCharacters,
Expand All @@ -131,6 +140,9 @@ func TestTruncateFields(t *testing.T) {
},
Output: common.MapStr{
"message": []byte("ez egy tú"), // this is a "to" (hungarian)
"log": common.MapStr{
"flags": []string{"truncated"},
},
},
ShouldError: false,
TruncateFunc: (*truncateFields).truncateBytes,
Expand All @@ -143,7 +155,7 @@ func TestTruncateFields(t *testing.T) {
config: truncateFieldsConfig{
Fields: []string{"message"},
MaxBytes: test.MaxBytes,
MaxCount: test.MaxCount,
MaxChars: test.MaxChars,
FailOnError: true,
},
truncate: test.TruncateFunc,
Expand Down

0 comments on commit 991a4da

Please sign in to comment.