Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for alternative encodings #39

Merged
merged 1 commit into from
Jul 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to capture it this way

- Add support for multiple encodings in the file input plugin

## [0.9.2] - 2020-07-13
### Added
- Link `carbon` into `/usr/local/bin` so it's available on most users' `PATH` ([PR28](https://github.com/observIQ/carbon/pull/28))
Expand Down
15 changes: 15 additions & 0 deletions docs/plugins/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ The `file_input` plugin reads logs from files. It will place the lines read into
| `file_path_field` | | A [field](/docs/types/field.md) that will be set to the path of the file the entry was read from |
| `file_name_field` | | A [field](/docs/types/field.md) that will be set to the name of the file the entry was read from |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `encoding` | `nop` | The encoding of the file being read. See the list of supported encodings below for available options |
| `max_log_size` | 1048576 | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory. |

Note that by default, no logs will be read unless the monitored file is actively being written to because `start_at` defaults to `end`.
Expand All @@ -27,6 +28,20 @@ If set, the `multiline` configuration block instructs the `file_input` plugin to
The `multiline` configuration block must contain exactly one of `line_start_pattern` or `line_end_pattern`. These are regex patterns that
match either the beginning of a new log entry, or the end of a log entry.

### Supported encodings

| Key | Description
| --- | --- |
| `nop` | No encoding validation. Treats the file as a stream of raw bytes |
| `utf-8` | UTF-8 encoding |
| `utf-16le` | UTF-16 encoding with little-endian byte order |
| `utf-16be` | UTF-16 encoding with little-endian byte order |
| `ascii` | ASCII encoding |
| `big5` | The Big5 Chinese character encoding |

Other less common encodings are supported on a best-effort basis. See [https://www.iana.org/assignments/character-sets/character-sets.xhtml](https://www.iana.org/assignments/character-sets/character-sets.xhtml) for other encodings available.


### Example Configurations

#### Simple file input
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
golang.org/x/net v0.0.0-20200301022130-244492dfa37a // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sys v0.0.0-20200513112337-417ce2331b5c // indirect
golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20200513201620-d5fe73897c97 // indirect
gonum.org/v1/gonum v0.6.2
google.golang.org/api v0.20.0
Expand Down
51 changes: 46 additions & 5 deletions plugin/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ import (
"os"
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/observiq/carbon/entry"
"github.com/observiq/carbon/plugin"
"github.com/observiq/carbon/plugin/helper"
"go.uber.org/zap"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/ianaindex"
"golang.org/x/text/encoding/unicode"
)

func init() {
Expand All @@ -33,10 +37,11 @@ type InputConfig struct {

PollInterval *plugin.Duration `json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline *MultilineConfig `json:"multiline,omitempty" yaml:"multiline,omitempty"`
FilePathField *entry.Field `json:"file_path_field,omitempty" yaml:"file_path_field,omitempty"`
FilePathField *entry.Field `json:"file_path_field,omitempty" yaml:"file_path_field,omitempty"`
FileNameField *entry.Field `json:"file_name_field,omitempty" yaml:"file_name_field,omitempty"`
StartAt string `json:"start_at,omitempty" yaml:"start_at,omitempty"`
MaxLogSize int `json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
Encoding string `json:"encoding,omitempty" yaml:"encoding,omitempty"`
}

// MultilineConfig is the configuration a multiline operation
Expand Down Expand Up @@ -72,7 +77,12 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) {
}
}

splitFunc, err := c.getSplitFunc()
encoding, err := lookupEncoding(c.Encoding)
if err != nil {
return nil, err
}

splitFunc, err := c.getSplitFunc(encoding)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -107,6 +117,7 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) {
fileUpdateChan: make(chan fileUpdateMessage, 10),
fingerprintBytes: 1000,
startAtBeginning: startAtBeginning,
encoding: encoding,
}

if c.MaxLogSize == 0 {
Expand All @@ -118,11 +129,39 @@ func (c InputConfig) Build(context plugin.BuildContext) (plugin.Plugin, error) {
return plugin, nil
}

var encodingOverrides = map[string]encoding.Encoding{
"utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf8": unicode.UTF8,
"ascii": unicode.UTF8,
"us-ascii": unicode.UTF8,
"nop": encoding.Nop,
"": encoding.Nop,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
if encoding, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return encoding, nil
}
encoding, err := ianaindex.IANA.Encoding(enc)
if err != nil {
return nil, fmt.Errorf("unsupported encoding '%s'", enc)
}
if encoding == nil {
return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc)
}
return encoding, nil
}

// getSplitFunc will return the split function associated the configured mode.
func (c InputConfig) getSplitFunc() (bufio.SplitFunc, error) {
func (c InputConfig) getSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) {
var splitFunc bufio.SplitFunc
if c.Multiline == nil {
splitFunc = NewNewlineSplitFunc()
var err error
splitFunc, err = NewNewlineSplitFunc(encoding)
if err != nil {
return nil, err
}
} else {
definedLineEndPattern := c.Multiline.LineEndPattern != ""
definedLineStartPattern := c.Multiline.LineStartPattern != ""
Expand Down Expand Up @@ -168,6 +207,8 @@ type InputPlugin struct {
fileUpdateChan chan fileUpdateMessage
fingerprintBytes int64

encoding encoding.Encoding

wg *sync.WaitGroup
readerWg *sync.WaitGroup
cancel context.CancelFunc
Expand Down Expand Up @@ -273,7 +314,7 @@ func (f *InputPlugin) checkFile(ctx context.Context, path string, firstCheck boo
go func(ctx context.Context, path string, offset, lastSeenSize int64) {
defer f.readerWg.Done()
messenger := f.newFileUpdateMessenger(path)
err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputPlugin, f.MaxLogSize)
err := ReadToEnd(ctx, path, offset, lastSeenSize, messenger, f.SplitFunc, f.FilePathField, f.FileNameField, f.InputPlugin, f.MaxLogSize, f.encoding)
if err != nil {
f.Warnw("Failed to read log file", zap.Error(err))
}
Expand Down
90 changes: 90 additions & 0 deletions plugin/builtin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
"golang.org/x/text/encoding/unicode"
)

func newTestFileSource(t *testing.T) (*InputPlugin, chan *entry.Entry) {
Expand All @@ -46,6 +47,7 @@ func newTestFileSource(t *testing.T) (*InputPlugin, chan *entry.Entry) {
},
SplitFunc: bufio.ScanLines,
PollInterval: 50 * time.Millisecond,
encoding: unicode.UTF8,
persist: helper.NewScopedDBPersister(db, "testfile"),
runningFiles: make(map[string]struct{}),
knownFiles: make(map[string]*knownFileInfo),
Expand Down Expand Up @@ -746,3 +748,91 @@ func expectNoMessages(t *testing.T, c chan *entry.Entry) {
case <-time.After(200 * time.Millisecond):
}
}

func TestEncodings(t *testing.T) {
t.Parallel()
cases := []struct {
name string
contents []byte
encoding string
expected [][]byte
}{
{
"Nop",
[]byte{0xc5, '\n'},
"",
[][]byte{{0xc5}},
},
{
"InvalidUTFReplacement",
[]byte{0xc5, '\n'},
"utf8",
[][]byte{{0xef, 0xbf, 0xbd}},
},
{
"ValidUTF8",
[]byte("foo\n"),
"utf8",
[][]byte{[]byte("foo")},
},
{
"ChineseCharacter",
[]byte{230, 138, 152, '\n'}, // 折\n
"utf8",
[][]byte{{230, 138, 152}},
},
{
"SmileyFaceUTF16",
[]byte{216, 61, 222, 0, 0, 10}, // 😀\n
"utf-16be",
[][]byte{{240, 159, 152, 128}},
},
{
"SmileyFaceNewlineUTF16",
[]byte{216, 61, 222, 0, 0, 10, 0, 102, 0, 111, 0, 111}, // 😀\nfoo
"utf-16be",
[][]byte{{240, 159, 152, 128}, {102, 111, 111}},
},
{
"SmileyFaceNewlineUTF16LE",
[]byte{61, 216, 0, 222, 10, 0, 102, 0, 111, 0, 111, 0}, // 😀\nfoo
"utf-16le",
[][]byte{{240, 159, 152, 128}, {102, 111, 111}},
},
{
"ChineseCharacterBig5",
[]byte{167, 233, 10}, // 折\n
"big5",
[][]byte{{230, 138, 152}},
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
tempDir := testutil.NewTempDir(t)
path := filepath.Join(tempDir, "in.log")
err := ioutil.WriteFile(path, tc.contents, 0777)
require.NoError(t, err)

source, receivedEntries := newTestFileSource(t)
source.Include = []string{path}
source.encoding, err = lookupEncoding(tc.encoding)
require.NoError(t, err)
source.SplitFunc, err = NewNewlineSplitFunc(source.encoding)
require.NoError(t, err)
require.NotNil(t, source.encoding)

err = source.Start()
require.NoError(t, err)

for _, expected := range tc.expected {
select {
case entry := <-receivedEntries:
require.Equal(t, expected, []byte(entry.Record.(string)))
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for entry to be read")
}
}
})
}
}
35 changes: 25 additions & 10 deletions plugin/builtin/input/file/line_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bufio"
"bytes"
"regexp"

"golang.org/x/text/encoding"
)

// NewLineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
Expand Down Expand Up @@ -67,27 +69,40 @@ func NewLineEndSplitFunc(re *regexp.Regexp) bufio.SplitFunc {

// NewNewlineSplitFunc splits log lines by newline, just as bufio.ScanLines, but
// never returning an token using EOF as a terminator
func NewNewlineSplitFunc() bufio.SplitFunc {
func NewNewlineSplitFunc(encoding encoding.Encoding) (bufio.SplitFunc, error) {
newline, err := encodedNewline(encoding)
if err != nil {
return nil, err
}

carriageReturn, err := encodedCarriageReturn(encoding)
if err != nil {
return nil, err
}

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

if i := bytes.IndexByte(data, '\n'); i >= 0 {
if i := bytes.Index(data, newline); i >= 0 {
// We have a full newline-terminated line.
return i + 1, dropCR(data[0:i]), nil
return i + len(newline), bytes.TrimSuffix(data[:i], carriageReturn), nil
}

// Request more data.
return 0, nil, nil
}
}, nil
}

// dropCR drops a terminal \r from the data.
func dropCR(data []byte) []byte {
if len(data) > 0 && data[len(data)-1] == '\r' {
return data[0 : len(data)-1]
}
func encodedNewline(encoding encoding.Encoding) ([]byte, error) {
out := make([]byte, 10)
nDst, _, err := encoding.NewEncoder().Transform(out, []byte{'\n'}, true)
return out[:nDst], err
}

return data
func encodedCarriageReturn(encoding encoding.Encoding) ([]byte, error) {
out := make([]byte, 10)
nDst, _, err := encoding.NewEncoder().Transform(out, []byte{'\r'}, true)
return out[:nDst], err
}
Loading