Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the take_over mode for filestream inputs #34292

Merged
merged 24 commits into from
Jan 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
671aa32
Add the `take_over` mode for filestream inputs
rdner Jan 17, 2023
aea12d8
Add tests
rdner Jan 19, 2023
6dbfb13
Fix ineffective assignment
rdner Jan 19, 2023
a54c00b
Add one more test case for the take over
rdner Jan 19, 2023
0c08156
Add tests for file backups
rdner Jan 19, 2023
e3c577d
Add tests for registry backup
rdner Jan 19, 2023
fea3366
Add a doc section about removing the old config
rdner Jan 19, 2023
f3100c2
Add recovery section to mention backups
rdner Jan 19, 2023
14c8b74
Remove `.to` from the backup suffix since now it's generic
rdner Jan 20, 2023
2e97d7d
Fix the backup filename example in the doc
rdner Jan 20, 2023
2616186
Address PR comments
rdner Jan 24, 2023
ca96523
Merge branch 'main' into filestream-take-over
rdner Jan 24, 2023
a9ead32
Close created files so they can be removed on Windows
rdner Jan 24, 2023
5b65a82
Merge branch 'main' into filestream-take-over
rdner Jan 24, 2023
2fa1e66
Merge branch 'main' into filestream-take-over
rdner Jan 24, 2023
2222dab
Merge branch 'main' into filestream-take-over
rdner Jan 25, 2023
55f00fe
Merge branch 'main' into filestream-take-over
rdner Jan 25, 2023
49f713c
Merge branch 'main' into filestream-take-over
rdner Jan 25, 2023
966bb2e
Apply suggestions from code review
rdner Jan 26, 2023
10308bd
Add the beta disclaimer to the config reference
rdner Jan 26, 2023
4e446d6
Fix wrong indentation
rdner Jan 26, 2023
92a42bb
Update the x-pack Filebeat reference as well
rdner Jan 26, 2023
abcc6b2
Fix tests after accepting review suggestions
rdner Jan 26, 2023
0e26789
Merge branch 'main' into filestream-take-over
rdner Jan 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add support for recognizing the log level in Elasticsearch JVM logs {pull}34159[34159]
- Add metrics for TCP packet processing. {pull}34333[34333]
belimawr marked this conversation as resolved.
Show resolved Hide resolved
- Add metrics for unix socket packet processing. {pull}34335[34335]
- Add beta `take over` mode for `filestream` for simple migration from `log` inputs {pull}34292[34292]

*Auditbeat*

Expand Down
14 changes: 9 additions & 5 deletions filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ filebeat.inputs:
# Source of container events. Available options: all, stdin, stderr.
#stream: all

# Format of the container events. Available options: auto, cri, docker, json-file
# Format of the container events. Available options: auto, cri, docker, json-file
#format: auto

### Log rotation
Expand All @@ -412,7 +412,7 @@ filebeat.inputs:
#rotation.external.strategy.copytruncate:
# Regex that matches the rotated files.
# suffix_regex: \.\d$
# If the rotated filename suffix is a datetime, set it here.
# If the rotated filename suffix is a datetime, set it here.
# dateformat: -20060102

### State options
Expand Down Expand Up @@ -452,6 +452,11 @@ filebeat.inputs:
# Available options: since_first_start, since_last_start.
#ignore_inactive: ""

# If `take_over` is set to `true`, this `filestream` will take over all files
# from `log` inputs if they match at least one of the `paths` set in the `filestream`.
# This functionality is still in beta.
#take_over: false
rdner marked this conversation as resolved.
Show resolved Hide resolved

# Defines the buffer size every harvester uses when fetching the file
#harvester_buffer_size: 16384

Expand Down Expand Up @@ -609,7 +614,7 @@ filebeat.inputs:
#hosts:
#- kafka-broker-1:9092
#- kafka-broker-2:9092

# A list of topics to read from.
#topics: ["my-topic", "important-logs"]

Expand Down Expand Up @@ -644,7 +649,7 @@ filebeat.inputs:

# The minimum number of bytes to wait for.
#fetch.min: 1

# The default number of bytes to read per request.
#fetch.default: 1MB

Expand Down Expand Up @@ -777,4 +782,3 @@ filebeat.inputs:
#- multiline:
#type: count
#count_lines: 3

47 changes: 47 additions & 0 deletions filebeat/backup/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backup

import (
"errors"
"os"
)

const (
backupSuffix = ".bak"
)

// Backuper defines backup-related operations
type Backuper interface {
// Backup performs the backup
Backup() error
// Removes all backups created by this backuper
Remove() error
}

// fileExists checks if the given file exists
func fileExists(name string) (bool, error) {
_, err := os.Stat(name)
if err == nil {
return true, nil
}
if errors.Is(err, os.ErrNotExist) {
return false, nil
}
return false, err
}
99 changes: 99 additions & 0 deletions filebeat/backup/files.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backup

import (
"fmt"
"io"
"os"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)

// NewFileBackuper creates a new backuper that creates backups for the given files.
func NewFileBackuper(log *logp.Logger, files []string) Backuper {
return &fileBackuper{
log: log,
files: files,
}
}

type fileBackuper struct {
log *logp.Logger
files []string
backups []string
}

// Backup creates temporary backups for given files and returns a callback that
// removes every created backup file
func (fb *fileBackuper) Backup() error {
var (
buf = make([]byte, 64*1024) // 64KB
)

for _, file := range fb.files {
err := func() error {
src, err := os.Open(file)
if err != nil {
return err
}
defer src.Close()

// we must put the timestamp as a prefix, so after the restart the new backups don't override the previous ones
backupFilename := fmt.Sprintf("%s-%d%s", file, time.Now().UnixNano(), backupSuffix)
dst, err := os.OpenFile(backupFilename, os.O_CREATE|os.O_EXCL|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
return err
}
defer dst.Close()

_, err = io.CopyBuffer(dst, src, buf)
if err != nil {
return err
}

fb.backups = append(fb.backups, backupFilename)
return nil
}()
if err != nil {
return fmt.Errorf("failed to backup a file %s: %w", file, err)
}
}

return nil
}

// Remove removes all backups created by this backuper
func (fb fileBackuper) Remove() error {
fb.log.Infof("Removing backup files: %v...", fb.backups)

var errs []error
for _, backup := range fb.backups {
err := os.Remove(backup)
if err != nil {
errs = append(errs, err)
}
}

if len(errs) != 0 {
return fmt.Errorf("failed to remove some backups: %v", errs)
}

return nil
}
91 changes: 91 additions & 0 deletions filebeat/backup/files_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package backup

import (
"os"
"path/filepath"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/stretchr/testify/require"
)

func TestFileBackup(t *testing.T) {
log := logp.NewLogger("backup-test")
files := createFiles(t, 3)

backuper := NewFileBackuper(log, files)

t.Run("creates exactly one backup per given file", func(t *testing.T) {
err := backuper.Backup()
require.NoError(t, err)
requireBackups(t, files, 1)
})

t.Run("creates second round of backups", func(t *testing.T) {
// there is a unix time with nanosecond precision in the filename
// we can create only one backup per nanosecond
// if there is already a file created in the same nanosecond, the backup fails
time.Sleep(time.Microsecond)

err := backuper.Backup()
require.NoError(t, err)

requireBackups(t, files, 2)
})

t.Run("removes all created backups", func(t *testing.T) {
err := backuper.Remove()
require.NoError(t, err)

requireBackups(t, files, 0)
})
}

func createFiles(t *testing.T, count int) (created []string) {
t.Helper()

tmp := t.TempDir()

for i := 0; i < count; i++ {
file, err := os.CreateTemp(tmp, "file-*")
require.NoError(t, err)
_, err = file.WriteString(file.Name())
require.NoError(t, err)
file.Close()
created = append(created, file.Name())
}

return created
}

func requireBackups(t *testing.T, files []string, expectedCount int) {
for _, file := range files {
matches, err := filepath.Glob(file + "-*" + backupSuffix)
require.NoError(t, err)
require.Len(t, matches, expectedCount, "expected a different amount of created backups")
for _, match := range matches {
content, err := os.ReadFile(match)
require.NoError(t, err)
require.Equal(t, file, string(content))
}
}
}
Loading