Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add container input, deprecate docker in favor of it #12162

Merged
merged 13 commits into from
May 20, 2019
2 changes: 1 addition & 1 deletion filebeat/autodiscover/builder/hints/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type config struct {

func defaultConfig() config {
rawCfg := map[string]interface{}{
"type": "docker",
"type": "container",
"containers": map[string]interface{}{
"paths": []string{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a less docker-specific path here? Or add a runtime option that sets up the proper defaults depending on the runtime?

Copy link
Contributor Author

@exekias exekias May 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently working on a different PR to make autodiscover more CRI friendly, this is meant to keep it working as of now

// To be able to use this builder with CRI-O replace paths with:
Expand Down
11 changes: 6 additions & 5 deletions filebeat/harvester/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import "github.com/elastic/beats/libbeat/common/match"

// Contains available input types
const (
LogType = "log"
StdinType = "stdin"
RedisType = "redis"
UdpType = "udp"
DockerType = "docker"
LogType = "log"
StdinType = "stdin"
RedisType = "redis"
UdpType = "udp"
exekias marked this conversation as resolved.
Show resolved Hide resolved
exekias marked this conversation as resolved.
Show resolved Hide resolved
exekias marked this conversation as resolved.
Show resolved Hide resolved
DockerType = "docker"
ContainerType = "container"
)

// MatchAny checks if the text matches any of the regular expressions
Expand Down
1 change: 1 addition & 0 deletions filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 35 additions & 0 deletions filebeat/input/container/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package container

var defaultConfig = config{
Stream: "all",
Format: "auto",
}

type config struct {
// Stream can be all, stdout or stderr
Stream string `config:"stream"`

// Fore CRI format (don't perform autodetection)
exekias marked this conversation as resolved.
Show resolved Hide resolved
// TODO remove in favor of format, below
CRIForce bool `config:"cri.force"`

// TODO Format can be auto, cri, json-file
exekias marked this conversation as resolved.
Show resolved Hide resolved
Format string `config:"format"`
}
108 changes: 108 additions & 0 deletions filebeat/input/container/input.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package container

import (
"fmt"
"strings"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/log"
"github.com/elastic/beats/libbeat/common"

"github.com/pkg/errors"
)

func init() {
err := input.Register("container", NewInput)
if err != nil {
panic(err)
}
}

// NewInput creates a new container input
func NewInput(
cfg *common.Config,
outletFactory channel.Connector,
context input.Context,
) (input.Input, error) {
// Wrap log input with custom docker settings
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrap(err, "reading container input config")
}

if err := checkStream(config.Stream); err != nil {
return nil, err
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better implement func (c *config) Validate() error . This will trigger validation on Unpack and adds some more context (file, full name of parent namespace including index) to the error message.


// Set partial line joining to true (both json-file and CRI)
if err := cfg.SetBool("docker-json.partial", -1, true); err != nil {
return nil, errors.Wrap(err, "update input config")
}

if err := cfg.SetBool("docker-json.cri_flags", -1, true); err != nil {
return nil, errors.Wrap(err, "update input config")
}

// Allow stream selection (stdout/stderr/all)
if err := cfg.SetString("docker-json.stream", -1, config.Stream); err != nil {
return nil, errors.Wrap(err, "update input config")
}

if err := cfg.SetBool("docker-json.force_cri_logs", -1, config.CRIForce); err != nil {
return nil, errors.Wrap(err, "update input config")
}

// Set symlinks to true as CRI-O paths could point to symlinks instead of the actual path.
if err := cfg.SetBool("symlinks", -1, true); err != nil {
return nil, errors.Wrap(err, "update input config")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A many setters can be replaced by a single call to 'Merge':

err := cfg.Merge(common.MapStr{
    "docker-json.partial": true,
    "docker-json.cri_flags": true,
    ...
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this, I didn't know it could be so simple! :godmode:


// Add stream to meta to ensure different state per stream
if config.Stream != "all" {
if context.Meta == nil {
context.Meta = map[string]string{}
}
context.Meta["stream"] = config.Stream
}

return log.NewInput(cfg, outletFactory, context)
}

func checkStream(val string) error {
exekias marked this conversation as resolved.
Show resolved Hide resolved
for _, s := range []string{"all", "stdout", "stderr"} {
if s == val {
return nil
}
}

return fmt.Errorf("Invalid value for stream: %s, supported values are: all, stdout, stderr", val)
}

func checkFormat(val string) error {
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
val = strings.ToLower(val)
for _, s := range []string{"auto", "docker", "cri"} {
if s == val {
return nil
}
}

return fmt.Errorf("Invalid value for format: %s, supported values are: auto, docker, cri", val)
}
2 changes: 0 additions & 2 deletions filebeat/input/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@ type containers struct {
IDs []string `config:"ids"`
Path string `config:"path"`

Paths []string `config:"paths"`

// Stream can be all, stdout or stderr
Stream string `config:"stream"`
}
42 changes: 10 additions & 32 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/log"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"
Expand All @@ -45,25 +46,18 @@ func NewInput(
) (input.Input, error) {
logger := logp.NewLogger("docker")

cfgwarn.Deprecate("8.0.0", "'docker' input deprecated. Use 'container' input instead.")

// Wrap log input with custom docker settings
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrap(err, "reading docker input config")
}

// Docker input should make sure that no callers should ever pass empty strings as container IDs or paths
// Docker input should make sure that no callers should ever pass empty strings as container IDs
// Hence we explicitly make sure that we catch such things and print stack traces in the event of
// an invocation so that it can be fixed.
var ids, paths []string
for _, p := range config.Containers.Paths {
if p != "" {
paths = append(paths, p)
} else {
logger.Error("Docker paths can't be empty for Docker input config")
logger.Debugw("Empty path for Docker logfile was received", logp.Stack("stacktrace"))
}
}

var ids []string
for _, containerID := range config.Containers.IDs {
if containerID != "" {
ids = append(ids, containerID)
Expand All @@ -73,23 +67,12 @@ func NewInput(
}
}

if len(ids) == 0 && len(paths) == 0 {
if len(ids) == 0 {
return nil, errors.New("Docker input requires at least one entry under 'containers.ids' or 'containers.paths'")
exekias marked this conversation as resolved.
Show resolved Hide resolved
}

// IDs + Path and Paths are mutually exclusive. Ensure that only one of them are set in a given configuration
if len(ids) != 0 && len(paths) != 0 {
return nil, errors.New("can not provide both 'containers.ids' and 'containers.paths' in the same input config")
}

if len(ids) != 0 {
for idx, containerID := range ids {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}
} else {
for idx, p := range paths {
cfg.SetString("paths", idx, p)
}
for idx, containerID := range ids {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}

if err := checkStream(config.Containers.Stream); err != nil {
Expand All @@ -108,13 +91,8 @@ func NewInput(
return nil, errors.Wrap(err, "update input config")
}

if err := cfg.SetBool("docker-json.force_cri_logs", -1, config.CRIForce); err != nil {
return nil, errors.Wrap(err, "update input config")
}

if len(paths) != 0 {
// Set symlinks to true as CRI-O paths could point to symlinks instead of the actual path.
if err := cfg.SetBool("symlinks", -1, true); err != nil {
if config.CRIForce {
if err := cfg.SetString("docker-json.format", -1, "cri"); err != nil {
return nil, errors.Wrap(err, "update input config")
}
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type config struct {
DockerJSON *struct {
Stream string `config:"stream"`
Partial bool `config:"partial"`
ForceCRI bool `config:"force_cri_logs"`
Format string `config:"format"`
CRIFlags bool `config:"cri_flags"`
} `config:"docker-json"`
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ func (h *Harvester) open() error {
return h.openFile()
case harvester.DockerType:
return h.openFile()
case harvester.ContainerType:
exekias marked this conversation as resolved.
Show resolved Hide resolved
return h.openFile()
default:
return fmt.Errorf("Invalid harvester type: %+v", h.config)
}
Expand Down Expand Up @@ -577,7 +579,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {

if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.ForceCRI, h.config.DockerJSON.CRIFlags)
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.Format, h.config.DockerJSON.CRIFlags)
}

if h.config.JSON != nil {
Expand Down
27 changes: 15 additions & 12 deletions libbeat/reader/readjson/docker_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"encoding/json"
"runtime"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -38,12 +39,11 @@ type DockerJSONReader struct {
// join partial lines
partial bool

// Force log format: json-file | cri
forceCRI bool

// parse CRI flags
criflags bool

parseLine func(message *reader.Message, msg *logLine) error

stripNewLine func(msg *reader.Message)
}

Expand All @@ -56,15 +56,23 @@ type logLine struct {
}

// New creates a new reader renaming a field
func New(r reader.Reader, stream string, partial bool, forceCRI bool, CRIFlags bool) *DockerJSONReader {
func New(r reader.Reader, stream string, partial bool, format string, CRIFlags bool) *DockerJSONReader {
reader := DockerJSONReader{
stream: stream,
partial: partial,
reader: r,
forceCRI: forceCRI,
criflags: CRIFlags,
}

switch strings.ToLower(format) {
case "docker", "json-file":
reader.parseLine = reader.parseDockerJSONLog
case "cri":
reader.parseLine = reader.parseCRILog
default:
reader.parseLine = reader.parseAuto
}

if runtime.GOOS == "windows" {
reader.stripNewLine = stripNewLineWin
} else {
Expand Down Expand Up @@ -92,7 +100,7 @@ func (p *DockerJSONReader) parseCRILog(message *reader.Message, msg *logLine) er
if len(log) < split {
return errors.New("invalid CRI log format")
}
ts, err := time.Parse(time.RFC3339, string(log[i]))
ts, err := time.Parse(time.RFC3339Nano, string(log[i]))
if err != nil {
return errors.Wrap(err, "parsing CRI timestamp")
}
Expand Down Expand Up @@ -155,12 +163,7 @@ func (p *DockerJSONReader) parseDockerJSONLog(message *reader.Message, msg *logL
return nil
}

func (p *DockerJSONReader) parseLine(message *reader.Message, msg *logLine) error {
if p.forceCRI {
return p.parseCRILog(message, msg)
}

// If froceCRI isn't set, autodetect file type
func (p *DockerJSONReader) parseAuto(message *reader.Message, msg *logLine) error {
if len(message.Content) > 0 && message.Content[0] == '{' {
return p.parseDockerJSONLog(message, msg)
}
Expand Down
Loading