diff --git a/plugins/outputs/remotefile/remotefile.go b/plugins/outputs/remotefile/remotefile.go index 9db5c039368d4..a30badf10c6c6 100644 --- a/plugins/outputs/remotefile/remotefile.go +++ b/plugins/outputs/remotefile/remotefile.go @@ -177,7 +177,12 @@ func (f *File) Write(metrics []telegraf.Metric) error { // Group the metrics per output file groups := make(map[string][]telegraf.Metric) - for _, m := range metrics { + for _, raw := range metrics { + m := raw + if wm, ok := raw.(telegraf.UnwrappableMetric); ok { + m = wm.Unwrap() + } + for _, tmpl := range f.templates { buf.Reset() if err := tmpl.Execute(&buf, m); err != nil { diff --git a/plugins/outputs/remotefile/remotefile_test.go b/plugins/outputs/remotefile/remotefile_test.go index 70ba9a919167e..cdd5b51617967 100644 --- a/plugins/outputs/remotefile/remotefile_test.go +++ b/plugins/outputs/remotefile/remotefile_test.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -393,3 +394,130 @@ func TestForgettingFiles(t *testing.T) { require.Len(t, plugin.serializers, 1) require.Contains(t, plugin.serializers, "test-b.csv") } + +func TestTrackingMetrics(t *testing.T) { + // see issue #16045 + inputRaw := []telegraf.Metric{ + metric.New( + "test", + map[string]string{"source": "localhost"}, + map[string]interface{}{"value": 23}, + time.Unix(1719410465, 0), + ), + metric.New( + "test", + map[string]string{"source": "remotehost"}, + map[string]interface{}{"value": 21}, + time.Unix(1719410465, 0), + ), + metric.New( + "test", + map[string]string{"source": "localhost"}, + map[string]interface{}{"value": 42}, + time.Unix(1719410485, 0), + ), + metric.New( + "test", + map[string]string{"source": "remotehost"}, + map[string]interface{}{"value": 66}, + time.Unix(1719410485, 0), + ), + metric.New( + "test", + map[string]string{"source": "remotehost"}, + map[string]interface{}{"value": 55}, + time.Unix(1716310124, 0), + ), + metric.New( + "test", + map[string]string{"source": "remotehost"}, + map[string]interface{}{"value": 1}, + time.Unix(1716310174, 0), + ), + } + + // Create tracking metrics as inputs for the test + var mu sync.Mutex + delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw)) + notify := func(di telegraf.DeliveryInfo) { + mu.Lock() + defer mu.Unlock() + delivered = append(delivered, di) + } + input := make([]telegraf.Metric, 0, len(inputRaw)) + for _, m := range inputRaw { + tm, _ := metric.WithTracking(m, notify) + input = append(input, tm) + } + + // Create the expectations + expected := map[string][]string{ + "localhost-2024-06-26": { + "test,source=localhost value=23i 1719410465000000000\n", + "test,source=localhost value=42i 1719410485000000000\n", + }, + "remotehost-2024-06-26": { + "test,source=remotehost value=21i 1719410465000000000\n", + "test,source=remotehost value=66i 1719410485000000000\n", + }, + "remotehost-2024-05-21": { + "test,source=remotehost value=55i 1716310124000000000\n", + "test,source=remotehost value=1i 1716310174000000000\n", + }, + } + + // Prepare the output filesystem + tmpdir, err := os.MkdirTemp("", "telegraf-remotefile-*") + require.NoError(t, err) + defer os.RemoveAll(tmpdir) + + // Setup the plugin including the serializer + plugin := &File{ + Remote: config.NewSecret([]byte("local:" + tmpdir)), + Files: []string{`{{.Tag "source"}}-{{.Time.Format "2006-01-02"}}`}, + WriteBackInterval: config.Duration(100 * time.Millisecond), + Log: &testutil.Logger{}, + } + + plugin.SetSerializerFunc(func() (telegraf.Serializer, error) { + serializer := &influx.Serializer{} + err := serializer.Init() + return serializer, err + }) + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() + + // Write the metrics and wait for the data to settle to disk + require.NoError(t, plugin.Write(input)) + require.Eventually(t, func() bool { + ok := true + for fn := range expected { + _, err := os.Stat(filepath.Join(tmpdir, fn)) + ok = ok && err == nil + } + return ok + }, 5*time.Second, 100*time.Millisecond) + + // Check the result + for fn, lines := range expected { + tmpfn := filepath.Join(tmpdir, fn) + require.FileExists(t, tmpfn) + + actual, err := os.ReadFile(tmpfn) + require.NoError(t, err) + require.Equal(t, strings.Join(lines, ""), string(actual)) + } + + // Simulate output acknowledging delivery + for _, m := range input { + m.Accept() + } + + // Check delivery + require.Eventuallyf(t, func() bool { + mu.Lock() + defer mu.Unlock() + return len(input) == len(delivered) + }, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected)) +}