Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(outputs.remotefile): Handle tracking metrics correctly #16289

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion plugins/outputs/remotefile/remotefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,12 @@ func (f *File) Write(metrics []telegraf.Metric) error {

// Group the metrics per output file
groups := make(map[string][]telegraf.Metric)
for _, m := range metrics {
for _, raw := range metrics {
m := raw
if wm, ok := raw.(telegraf.UnwrappableMetric); ok {
m = wm.Unwrap()
}

for _, tmpl := range f.templates {
buf.Reset()
if err := tmpl.Execute(&buf, m); err != nil {
Expand Down
128 changes: 128 additions & 0 deletions plugins/outputs/remotefile/remotefile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -393,3 +394,130 @@ func TestForgettingFiles(t *testing.T) {
require.Len(t, plugin.serializers, 1)
require.Contains(t, plugin.serializers, "test-b.csv")
}

func TestTrackingMetrics(t *testing.T) {
// see issue #16045
inputRaw := []telegraf.Metric{
metric.New(
"test",
map[string]string{"source": "localhost"},
map[string]interface{}{"value": 23},
time.Unix(1719410465, 0),
),
metric.New(
"test",
map[string]string{"source": "remotehost"},
map[string]interface{}{"value": 21},
time.Unix(1719410465, 0),
),
metric.New(
"test",
map[string]string{"source": "localhost"},
map[string]interface{}{"value": 42},
time.Unix(1719410485, 0),
),
metric.New(
"test",
map[string]string{"source": "remotehost"},
map[string]interface{}{"value": 66},
time.Unix(1719410485, 0),
),
metric.New(
"test",
map[string]string{"source": "remotehost"},
map[string]interface{}{"value": 55},
time.Unix(1716310124, 0),
),
metric.New(
"test",
map[string]string{"source": "remotehost"},
map[string]interface{}{"value": 1},
time.Unix(1716310174, 0),
),
}

// Create tracking metrics as inputs for the test
var mu sync.Mutex
delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw))
notify := func(di telegraf.DeliveryInfo) {
mu.Lock()
defer mu.Unlock()
delivered = append(delivered, di)
}
input := make([]telegraf.Metric, 0, len(inputRaw))
for _, m := range inputRaw {
tm, _ := metric.WithTracking(m, notify)
input = append(input, tm)
}

// Create the expectations
expected := map[string][]string{
"localhost-2024-06-26": {
"test,source=localhost value=23i 1719410465000000000\n",
"test,source=localhost value=42i 1719410485000000000\n",
},
"remotehost-2024-06-26": {
"test,source=remotehost value=21i 1719410465000000000\n",
"test,source=remotehost value=66i 1719410485000000000\n",
},
"remotehost-2024-05-21": {
"test,source=remotehost value=55i 1716310124000000000\n",
"test,source=remotehost value=1i 1716310174000000000\n",
},
}

// Prepare the output filesystem
tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

// Setup the plugin including the serializer
plugin := &File{
Remote: config.NewSecret([]byte("local:" + tmpdir)),
Files: []string{`{{.Tag "source"}}-{{.Time.Format "2006-01-02"}}`},
WriteBackInterval: config.Duration(100 * time.Millisecond),
Log: &testutil.Logger{},
}

plugin.SetSerializerFunc(func() (telegraf.Serializer, error) {
serializer := &influx.Serializer{}
err := serializer.Init()
return serializer, err
})
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

// Write the metrics and wait for the data to settle to disk
require.NoError(t, plugin.Write(input))
require.Eventually(t, func() bool {
ok := true
for fn := range expected {
_, err := os.Stat(filepath.Join(tmpdir, fn))
ok = ok && err == nil
}
return ok
}, 5*time.Second, 100*time.Millisecond)

// Check the result
for fn, lines := range expected {
tmpfn := filepath.Join(tmpdir, fn)
require.FileExists(t, tmpfn)

actual, err := os.ReadFile(tmpfn)
require.NoError(t, err)
require.Equal(t, strings.Join(lines, ""), string(actual))
}

// Simulate output acknowledging delivery
for _, m := range input {
m.Accept()
}

// Check delivery
require.Eventuallyf(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(input) == len(delivered)
}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
}
Loading