Skip to content

Commit

Permalink
Add the take_over mode for filestream inputs (#34292)
Browse files Browse the repository at this point in the history
If a `filestream` input has the configuration parameter `take_over`
set to `true`, every `loginput` state record (in the registry) with
the `source` that matches at least one of the `filestream`'s
paths/globs will be taken over by this `filestream` input.

This means the existing `loginput` state entry gets converted into a
`filestream` entry (the `loginput` entry gets deleted).

The purpose of this mode is to make migration from `loginput` to
`filestream` as simple and smooth as possible by adding `take_over:
true` to the new `filestream` configuration. All offsets for input
files will be preserved and the `filestream` will continue ingesting
the files at the same point where the `loginput` stopped. This solves
the previously occurring data duplication (file re-ingestion) problem.
  • Loading branch information
rdner authored Jan 27, 2023
1 parent 762ad18 commit 06c4856
Show file tree
Hide file tree
Showing 17 changed files with 1,255 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add support for recognizing the log level in Elasticsearch JVM logs {pull}34159[34159]
- Add metrics for TCP packet processing. {pull}34333[34333]
- Add metrics for unix socket packet processing. {pull}34335[34335]
- Add beta `take over` mode for `filestream` for simple migration from `log` inputs {pull}34292[34292]

*Auditbeat*

Expand Down
14 changes: 9 additions & 5 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ filebeat.inputs:
# Source of container events. Available options: all, stdin, stderr.
#stream: all

# Format of the container events. Available options: auto, cri, docker, json-file
# Format of the container events. Available options: auto, cri, docker, json-file
#format: auto

### Log rotation
Expand All @@ -412,7 +412,7 @@ filebeat.inputs:
#rotation.external.strategy.copytruncate:
# Regex that matches the rotated files.
# suffix_regex: \.\d$
# If the rotated filename suffix is a datetime, set it here.
# If the rotated filename suffix is a datetime, set it here.
# dateformat: -20060102

### State options
Expand Down Expand Up @@ -452,6 +452,11 @@ filebeat.inputs:
# Available options: since_first_start, since_last_start.
#ignore_inactive: ""

# If `take_over` is set to `true`, this `filestream` will take over all files
# from `log` inputs if they match at least one of the `paths` set in the `filestream`.
# This functionality is still in beta.
#take_over: false

# Defines the buffer size every harvester uses when fetching the file
#harvester_buffer_size: 16384

Expand Down Expand Up @@ -609,7 +614,7 @@ filebeat.inputs:
#hosts:
#- kafka-broker-1:9092
#- kafka-broker-2:9092

# A list of topics to read from.
#topics: ["my-topic", "important-logs"]

Expand Down Expand Up @@ -644,7 +649,7 @@ filebeat.inputs:

# The minimum number of bytes to wait for.
#fetch.min: 1

# The default number of bytes to read per request.
#fetch.default: 1MB

Expand Down Expand Up @@ -777,4 +782,3 @@ filebeat.inputs:
#- multiline:
#type: count
#count_lines: 3

47 changes: 47 additions & 0 deletions filebeat/backup/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backup

import (
"errors"
"os"
)

const (
backupSuffix = ".bak"
)

// Backuper defines backup-related operations
type Backuper interface {
// Backup performs the backup
Backup() error
// Removes all backups created by this backuper
Remove() error
}

// fileExists checks if the given file exists
func fileExists(name string) (bool, error) {
_, err := os.Stat(name)
if err == nil {
return true, nil
}
if errors.Is(err, os.ErrNotExist) {
return false, nil
}
return false, err
}
99 changes: 99 additions & 0 deletions filebeat/backup/files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backup

import (
"fmt"
"io"
"os"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)

// NewFileBackuper creates a new backuper that creates backups for the given files.
func NewFileBackuper(log *logp.Logger, files []string) Backuper {
return &fileBackuper{
log: log,
files: files,
}
}

type fileBackuper struct {
log *logp.Logger
files []string
backups []string
}

// Backup creates temporary backups for given files and returns a callback that
// removes every created backup file
func (fb *fileBackuper) Backup() error {
var (
buf = make([]byte, 64*1024) // 64KB
)

for _, file := range fb.files {
err := func() error {
src, err := os.Open(file)
if err != nil {
return err
}
defer src.Close()

// we must put the timestamp as a prefix, so after the restart the new backups don't override the previous ones
backupFilename := fmt.Sprintf("%s-%d%s", file, time.Now().UnixNano(), backupSuffix)
dst, err := os.OpenFile(backupFilename, os.O_CREATE|os.O_EXCL|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer dst.Close()

_, err = io.CopyBuffer(dst, src, buf)
if err != nil {
return err
}

fb.backups = append(fb.backups, backupFilename)
return nil
}()
if err != nil {
return fmt.Errorf("failed to backup a file %s: %w", file, err)
}
}

return nil
}

// Remove removes all backups created by this backuper
func (fb fileBackuper) Remove() error {
fb.log.Infof("Removing backup files: %v...", fb.backups)

var errs []error
for _, backup := range fb.backups {
err := os.Remove(backup)
if err != nil {
errs = append(errs, err)
}
}

if len(errs) != 0 {
return fmt.Errorf("failed to remove some backups: %v", errs)
}

return nil
}
91 changes: 91 additions & 0 deletions filebeat/backup/files_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backup

import (
"os"
"path/filepath"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/stretchr/testify/require"
)

func TestFileBackup(t *testing.T) {
log := logp.NewLogger("backup-test")
files := createFiles(t, 3)

backuper := NewFileBackuper(log, files)

t.Run("creates exactly one backup per given file", func(t *testing.T) {
err := backuper.Backup()
require.NoError(t, err)
requireBackups(t, files, 1)
})

t.Run("creates second round of backups", func(t *testing.T) {
// there is a unix time with nanosecond precision in the filename
// we can create only one backup per nanosecond
// if there is already a file created in the same nanosecond, the backup fails
time.Sleep(time.Microsecond)

err := backuper.Backup()
require.NoError(t, err)

requireBackups(t, files, 2)
})

t.Run("removes all created backups", func(t *testing.T) {
err := backuper.Remove()
require.NoError(t, err)

requireBackups(t, files, 0)
})
}

func createFiles(t *testing.T, count int) (created []string) {
t.Helper()

tmp := t.TempDir()

for i := 0; i < count; i++ {
file, err := os.CreateTemp(tmp, "file-*")
require.NoError(t, err)
_, err = file.WriteString(file.Name())
require.NoError(t, err)
file.Close()
created = append(created, file.Name())
}

return created
}

func requireBackups(t *testing.T, files []string, expectedCount int) {
for _, file := range files {
matches, err := filepath.Glob(file + "-*" + backupSuffix)
require.NoError(t, err)
require.Len(t, matches, expectedCount, "expected a different amount of created backups")
for _, match := range matches {
content, err := os.ReadFile(match)
require.NoError(t, err)
require.Equal(t, file, string(content))
}
}
}
Loading

0 comments on commit 06c4856

Please sign in to comment.