From 3afd32d75e555ec2384ac2e8cb14da2ebad25701 Mon Sep 17 00:00:00 2001 From: Dane Strandboge <136023093+DStrand1@users.noreply.github.com> Date: Tue, 25 Jun 2024 15:13:42 -0500 Subject: [PATCH] feat: metric disk buffer --- accumulator.go | 4 + config/config.go | 5 + docs/LICENSE_OF_DEPENDENCIES.md | 2 + go.mod | 2 + go.sum | 7 + metric.go | 1 + metric/deserialize.go | 78 +++++++ metric/init.go | 7 + metric/tracking.go | 60 +++-- models/buffer.go | 18 +- models/buffer_disk.go | 222 +++++++++++++++++++ models/buffer_disk_test.go | 95 ++++++++ models/buffer_mem_test.go | 15 ++ models/buffer_suite_test.go | 24 +- models/testdata/testwal/00000000000000000001 | Bin 0 -> 1812 bytes 15 files changed, 498 insertions(+), 42 deletions(-) create mode 100644 metric/deserialize.go create mode 100644 metric/init.go create mode 100644 models/buffer_disk.go create mode 100644 models/buffer_disk_test.go create mode 100644 models/testdata/testwal/00000000000000000001 diff --git a/accumulator.go b/accumulator.go index ece69f3b176d1..4991752bb2d94 100644 --- a/accumulator.go +++ b/accumulator.go @@ -57,6 +57,10 @@ type Accumulator interface { // TrackingID uniquely identifies a tracked metric group type TrackingID uint64 +type TrackingData interface { + ID() TrackingID +} + // DeliveryInfo provides the results of a delivered metric group. type DeliveryInfo interface { // ID is the TrackingID diff --git a/config/config.go b/config/config.go index f0ccb0a80f507..87605890c5586 100644 --- a/config/config.go +++ b/config/config.go @@ -278,6 +278,9 @@ type AgentConfig struct { // Number of attempts to obtain a remote configuration via a URL during // startup. Set to -1 for unlimited attempts. ConfigURLRetryAttempts int `toml:"config_url_retry_attempts"` + + BufferStrategy string `toml:"buffer_strategy"` + BufferDirectory string `toml:"buffer_directory"` } // InputNames returns a list of strings of the configured inputs. @@ -1521,6 +1524,8 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig, c.getFieldString(tbl, "name_suffix", &oc.NameSuffix) c.getFieldString(tbl, "name_prefix", &oc.NamePrefix) c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior) + c.getFieldString(tbl, "buffer_strategy", &oc.BufferStrategy) + c.getFieldString(tbl, "buffer_directory", &oc.BufferDirectory) if c.hasErrs() { return nil, c.firstErr() diff --git a/docs/LICENSE_OF_DEPENDENCIES.md b/docs/LICENSE_OF_DEPENDENCIES.md index 4d3f189483269..06928cc12126f 100644 --- a/docs/LICENSE_OF_DEPENDENCIES.md +++ b/docs/LICENSE_OF_DEPENDENCIES.md @@ -349,6 +349,8 @@ following works: - github.com/tidwall/gjson [MIT License](https://github.com/tidwall/gjson/blob/master/LICENSE) - github.com/tidwall/match [MIT License](https://github.com/tidwall/match/blob/master/LICENSE) - github.com/tidwall/pretty [MIT License](https://github.com/tidwall/pretty/blob/master/LICENSE) +- github.com/tidwall/tinylru [MIT License](https://github.com/tidwall/tinylru/blob/master/LICENSE) +- github.com/tidwall/wal [MIT License](https://github.com/tidwall/wal/blob/master/LICENSE) - github.com/tinylib/msgp [MIT License](https://github.com/tinylib/msgp/blob/master/LICENSE) - github.com/tklauser/go-sysconf [BSD 3-Clause "New" or "Revised" License](https://github.com/tklauser/go-sysconf/blob/master/LICENSE) - github.com/tklauser/numcpus [Apache License 2.0](https://github.com/tklauser/numcpus/blob/master/LICENSE) diff --git a/go.mod b/go.mod index 10529d396e38b..80ca5d0885206 100644 --- a/go.mod +++ b/go.mod @@ -185,6 +185,7 @@ require ( github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0 github.com/thomasklein94/packer-plugin-libvirt v0.5.0 github.com/tidwall/gjson v1.17.0 + github.com/tidwall/wal v1.1.7 github.com/tinylib/msgp v1.2.0 github.com/urfave/cli/v2 v2.27.2 github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241 @@ -451,6 +452,7 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/tinylru v1.1.0 // indirect github.com/tklauser/go-sysconf v0.3.13 // indirect github.com/tklauser/numcpus v0.7.0 // indirect github.com/twmb/murmur3 v1.1.7 // indirect diff --git a/go.sum b/go.sum index 0833eaae3a9e9..a884b469ac149 100644 --- a/go.sum +++ b/go.sum @@ -2230,6 +2230,7 @@ github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0 h1:8B1u+sDwYhT github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0/go.mod h1:W1+yLUfUl8VLTzvmApP2FBHgCk8I5SKKjDWjxWEc33U= github.com/thomasklein94/packer-plugin-libvirt v0.5.0 h1:aj2HLHZZM/ClGLIwVp9rrgh+2TOU/w4EiaZHAwCpOgs= github.com/thomasklein94/packer-plugin-libvirt v0.5.0/go.mod h1:GwN82FQ6KxCNKtS8LNUgLbwTZs90GGhBzCmTNkrTCrY= +github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM= github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -2238,6 +2239,12 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY= github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro= +github.com/tidwall/tinylru v1.1.0 h1:XY6IUfzVTU9rpwdhKUF6nQdChgCdGjkMfLzbWyiau6I= +github.com/tidwall/tinylru v1.1.0/go.mod h1:3+bX+TJ2baOLMWTnlyNWHh4QMnFyARg2TLTQ6OFbzw8= +github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4= +github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E= +github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU= +github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k= github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0= github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk= github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk= diff --git a/metric.go b/metric.go index 2cf3d6a06072b..adad3cfc38c33 100644 --- a/metric.go +++ b/metric.go @@ -149,5 +149,6 @@ type UnwrappableMetric interface { type TrackingMetric interface { // TrackingID returns the ID used for tracking the metric TrackingID() TrackingID + TrackingData() TrackingData UnwrappableMetric } diff --git a/metric/deserialize.go b/metric/deserialize.go new file mode 100644 index 0000000000000..a253406a3b50c --- /dev/null +++ b/metric/deserialize.go @@ -0,0 +1,78 @@ +package metric + +import ( + "bytes" + "encoding/gob" + "errors" + "fmt" + "sync" + + "github.com/influxdata/telegraf" +) + +// storage for tracking data that can't be serialized to disk +var ( + // todo need some way to empty this map out when done with a tracking ID. + // grouped tracking metrics means that ID->Data association is not one to one, + // many metrics could be associated with one tracking ID so we cannot just + // clear this every time in FromBytes. + trackingStore = make(map[telegraf.TrackingID]telegraf.TrackingData) + mu = sync.Mutex{} + + // ErrSkipTracking indicates that tracking information could not be found after + // deserializing a metric from bytes. In this case we should skip the metric + // and continue as if it does not exist. + ErrSkipTracking = errors.New("metric tracking data not found") +) + +type serializedMetric struct { + M telegraf.Metric + TID telegraf.TrackingID +} + +func ToBytes(m telegraf.Metric) ([]byte, error) { + var sm serializedMetric + if um, ok := m.(telegraf.UnwrappableMetric); ok { + sm.M = um.Unwrap() + } else { + sm.M = m + } + + if tm, ok := m.(telegraf.TrackingMetric); ok { + sm.TID = tm.TrackingID() + + mu.Lock() + trackingStore[sm.TID] = tm.TrackingData() + mu.Unlock() + } + + var buf bytes.Buffer + encoder := gob.NewEncoder(&buf) + if err := encoder.Encode(&sm); err != nil { + return nil, fmt.Errorf("failed to encode metric to bytes: %w", err) + } + return buf.Bytes(), nil +} + +func FromBytes(b []byte) (telegraf.Metric, error) { + buf := bytes.NewBuffer(b) + decoder := gob.NewDecoder(buf) + + var sm *serializedMetric + if err := decoder.Decode(&sm); err != nil { + return nil, fmt.Errorf("failed to decode metric from bytes: %w", err) + } + + m := sm.M + if sm.TID != 0 { + mu.Lock() + td := trackingStore[sm.TID] + mu.Unlock() + + if td == nil { + return nil, ErrSkipTracking + } + m = rebuildTrackingMetric(m, td) + } + return m, nil +} diff --git a/metric/init.go b/metric/init.go new file mode 100644 index 0000000000000..85c901ce16b39 --- /dev/null +++ b/metric/init.go @@ -0,0 +1,7 @@ +package metric + +import "encoding/gob" + +func Init() { + gob.RegisterName("metric.metric", &metric{}) +} diff --git a/metric/tracking.go b/metric/tracking.go index 50f11c74d6dae..aa37360e62816 100644 --- a/metric/tracking.go +++ b/metric/tracking.go @@ -33,35 +33,36 @@ func newTrackingID() telegraf.TrackingID { } type trackingData struct { - id telegraf.TrackingID - rc int32 - acceptCount int32 - rejectCount int32 + //nolint:revive // method is already named ID + Id telegraf.TrackingID + Rc int32 + AcceptCount int32 + RejectCount int32 notifyFunc NotifyFunc } func (d *trackingData) incr() { - atomic.AddInt32(&d.rc, 1) + atomic.AddInt32(&d.Rc, 1) } func (d *trackingData) decr() int32 { - return atomic.AddInt32(&d.rc, -1) + return atomic.AddInt32(&d.Rc, -1) } func (d *trackingData) accept() { - atomic.AddInt32(&d.acceptCount, 1) + atomic.AddInt32(&d.AcceptCount, 1) } func (d *trackingData) reject() { - atomic.AddInt32(&d.rejectCount, 1) + atomic.AddInt32(&d.RejectCount, 1) } func (d *trackingData) notify() { d.notifyFunc( &deliveryInfo{ - id: d.id, - accepted: int(d.acceptCount), - rejected: int(d.rejectCount), + id: d.Id, + accepted: int(d.AcceptCount), + rejected: int(d.RejectCount), }, ) } @@ -75,10 +76,10 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric, m := &trackingMetric{ Metric: metric, d: &trackingData{ - id: newTrackingID(), - rc: 1, - acceptCount: 0, - rejectCount: 0, + Id: newTrackingID(), + Rc: 1, + AcceptCount: 0, + RejectCount: 0, notifyFunc: fn, }, } @@ -86,15 +87,22 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric, if finalizer != nil { runtime.SetFinalizer(m.d, finalizer) } - return m, m.d.id + return m, m.d.Id +} + +func rebuildTrackingMetric(metric telegraf.Metric, td telegraf.TrackingData) telegraf.Metric { + return &trackingMetric{ + Metric: metric, + d: td.(*trackingData), + } } func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.Metric, telegraf.TrackingID) { d := &trackingData{ - id: newTrackingID(), - rc: 0, - acceptCount: 0, - rejectCount: 0, + Id: newTrackingID(), + Rc: 0, + AcceptCount: 0, + RejectCount: 0, notifyFunc: fn, } @@ -114,7 +122,7 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf. d.notify() } - return group, d.id + return group, d.Id } func (m *trackingMetric) Copy() telegraf.Metric { @@ -152,7 +160,11 @@ func (m *trackingMetric) decr() { // Unwrap allows to access the underlying metric directly e.g. for go-templates func (m *trackingMetric) TrackingID() telegraf.TrackingID { - return m.d.id + return m.d.Id +} + +func (m *trackingMetric) TrackingData() telegraf.TrackingData { + return m.d } // Unwrap allows to access the underlying metric directly e.g. for go-templates @@ -173,3 +185,7 @@ func (r *deliveryInfo) ID() telegraf.TrackingID { func (r *deliveryInfo) Delivered() bool { return r.rejected == 0 } + +func (d *trackingData) ID() telegraf.TrackingID { + return d.Id +} diff --git a/models/buffer.go b/models/buffer.go index 92ea0217845b4..dc6363cc0747f 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -2,14 +2,18 @@ package models import ( "fmt" + "sync" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/selfstat" ) var ( AgentMetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{}) AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{}) + + registerGob = sync.OnceFunc(func() { metric.Init() }) ) type Buffer interface { @@ -45,12 +49,16 @@ type BufferStats struct { } // NewBuffer returns a new empty Buffer with the given capacity. -func NewBuffer(name string, alias string, capacity int, strategy string, _ string) (Buffer, error) { +func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) { + registerGob() + bs := NewBufferStats(name, alias, capacity) switch strategy { case "", "memory": return NewMemoryBuffer(capacity, bs) + case "disk": + return NewDiskBuffer(name, path, bs) } return nil, fmt.Errorf("invalid buffer strategy %q", strategy) } @@ -97,14 +105,14 @@ func (b *BufferStats) metricAdded() { b.MetricsAdded.Incr(1) } -func (b *BufferStats) metricWritten(metric telegraf.Metric) { +func (b *BufferStats) metricWritten(m telegraf.Metric) { AgentMetricsWritten.Incr(1) b.MetricsWritten.Incr(1) - metric.Accept() + m.Accept() } -func (b *BufferStats) metricDropped(metric telegraf.Metric) { +func (b *BufferStats) metricDropped(m telegraf.Metric) { AgentMetricsDropped.Incr(1) b.MetricsDropped.Incr(1) - metric.Reject() + m.Reject() } diff --git a/models/buffer_disk.go b/models/buffer_disk.go new file mode 100644 index 0000000000000..ef05791618247 --- /dev/null +++ b/models/buffer_disk.go @@ -0,0 +1,222 @@ +package models + +import ( + "errors" + "fmt" + "os" + "sync" + + "github.com/tidwall/wal" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" +) + +type DiskBuffer struct { + BufferStats + sync.Mutex + + file *wal.Log + path string + + batchFirst uint64 // Index of the first metric in the batch + batchSize uint64 // Number of metrics currently in the batch + + // Ending point of metrics read from disk on telegraf launch. + // Used to know whether to discard tracking metrics. + originalEnd uint64 +} + +func NewDiskBuffer(name string, path string, stats BufferStats) (*DiskBuffer, error) { + filePath := path + "/" + name + walFile, err := wal.Open(filePath, nil) + if err != nil { + return nil, fmt.Errorf("failed to open wal file: %w", err) + } + return &DiskBuffer{ + BufferStats: stats, + file: walFile, + path: filePath, + }, nil +} + +func (b *DiskBuffer) Len() int { + b.Lock() + defer b.Unlock() + return b.length() +} + +func (b *DiskBuffer) length() int { + // Special case for when the read index is zero, it must be empty (otherwise it would be >= 1) + if b.readIndex() == 0 { + return 0 + } + return int(b.writeIndex() - b.readIndex()) +} + +// readIndex is the first index to start reading metrics from, or the head of the buffer +func (b *DiskBuffer) readIndex() uint64 { + index, err := b.file.FirstIndex() + if err != nil { + panic(err) // can only occur with a corrupt wal file + } + return index +} + +// writeIndex is the first index to start writing metrics to, or the tail of the buffer +func (b *DiskBuffer) writeIndex() uint64 { + index, err := b.file.LastIndex() + if err != nil { + panic(err) // can only occur with a corrupt wal file + } + return index + 1 +} + +func (b *DiskBuffer) Add(metrics ...telegraf.Metric) int { + b.Lock() + defer b.Unlock() + + dropped := 0 + for _, m := range metrics { + if !b.addSingle(m) { + dropped++ + } + } + b.BufferSize.Set(int64(b.length())) + return dropped + // todo implement batched writes +} + +func (b *DiskBuffer) addSingle(m telegraf.Metric) bool { + data, err := metric.ToBytes(m) + if err != nil { + panic(err) + } + err = b.file.Write(b.writeIndex(), data) + if err == nil { + b.metricAdded() + return true + } + return false +} + +//nolint:unused // to be implemented in the future +func (b *DiskBuffer) addBatch(metrics []telegraf.Metric) int { + written := 0 + batch := new(wal.Batch) + for _, m := range metrics { + data, err := metric.ToBytes(m) + if err != nil { + panic(err) + } + batch.Write(b.writeIndex(), data) + b.metricAdded() + written++ + } + err := b.file.WriteBatch(batch) + if err != nil { + return 0 // todo error handle, test if a partial write occur + } + return written +} + +func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { + b.Lock() + defer b.Unlock() + + if b.length() == 0 { + // no metrics in the wal file, so return an empty array + return []telegraf.Metric{} + } + b.batchFirst = b.readIndex() + var metrics []telegraf.Metric + + b.batchSize = 0 + readIndex := b.batchFirst + endIndex := b.writeIndex() + for batchSize > 0 && readIndex < endIndex { + data, err := b.file.Read(readIndex) + if err != nil { + panic(err) + } + readIndex++ + + m, err := metric.FromBytes(data) + if errors.Is(err, metric.ErrSkipTracking) { + // could not look up tracking information for metric, skip + continue + } + if err != nil { + // non-recoverable error in deserialization, abort + panic(err) + } + if _, ok := m.(telegraf.TrackingMetric); ok && readIndex < b.originalEnd { + // tracking metric left over from previous instance, skip + continue + } + + metrics = append(metrics, m) + b.batchSize++ + batchSize-- + } + return metrics +} + +func (b *DiskBuffer) Accept(batch []telegraf.Metric) { + b.Lock() + defer b.Unlock() + + if b.batchSize == 0 || len(batch) == 0 { + // nothing to accept + return + } + for _, m := range batch { + b.metricWritten(m) + } + if b.length() == len(batch) { + b.resetWalFile() + } else { + err := b.file.TruncateFront(b.batchFirst + uint64(len(batch))) + if err != nil { + panic(err) + } + } + + // check if the original end index is still valid, clear if not + if b.originalEnd < b.readIndex() { + b.originalEnd = 0 + } + + b.resetBatch() + b.BufferSize.Set(int64(b.length())) +} + +func (b *DiskBuffer) Reject(_ []telegraf.Metric) { + // very little to do here as the disk buffer retains metrics in + // the wal file until a call to accept + b.Lock() + defer b.Unlock() + b.resetBatch() +} + +func (b *DiskBuffer) Stats() BufferStats { + return b.BufferStats +} + +func (b *DiskBuffer) resetBatch() { + b.batchFirst = 0 + b.batchSize = 0 +} + +// todo This is very messy and not ideal, but serves as the only way I can find currently +// todo to actually clear the walfile completely if needed, since Truncate() calls require +// todo at least one entry remains in them otherwise they return an error. +func (b *DiskBuffer) resetWalFile() { + b.file.Close() + os.Remove(b.path) + walFile, err := wal.Open(b.path, nil) + if err != nil { + panic(err) + } + b.file = walFile +} diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go new file mode 100644 index 0000000000000..a8eda7dfc7e08 --- /dev/null +++ b/models/buffer_disk_test.go @@ -0,0 +1,95 @@ +package models + +import ( + "fmt" + "io" + "os" + "path/filepath" + "testing" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/require" +) + +func newTestDiskBuffer(t testing.TB) Buffer { + path, err := os.MkdirTemp("", "*-buffer-test") + require.NoError(t, err) + return newTestDiskBufferWithPath(t, "test", path) +} + +func newTestDiskBufferWithPath(t testing.TB, name string, path string) Buffer { + t.Helper() + buf, err := NewBuffer(name, "", 0, "disk", path) + require.NoError(t, err) + buf.Stats().MetricsAdded.Set(0) + buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsDropped.Set(0) + return buf +} + +func TestBuffer_RetainsTrackingInformation(t *testing.T) { + var delivered int + mm, _ := metric.WithTracking(Metric(), func(_ telegraf.DeliveryInfo) { + delivered++ + }) + b := newTestDiskBuffer(t) + b.Add(mm) + batch := b.Batch(1) + b.Accept(batch) + require.Equal(t, 1, delivered) +} + +// WAL file tested here was written as: +// 1: Metric() +// 2: Metric() +// 3: Metric() +// 4: metric.WithTracking(Metric()) +// 5: Metric() +// +// Expected to drop the 4th metric, as tracking metrics from +// previous instances are dropped when the wal file is reopened. +func TestBuffer_TrackingDroppedFromOldWal(t *testing.T) { + // copy the testdata so we do not destroy the testdata wal file + path, err := os.MkdirTemp("", "*-buffer-test") + require.NoError(t, err) + f, err := os.Create(path + "/00000000000000000001") + require.NoError(t, err) + f1, err := os.Open("testdata/testwal/00000000000000000001") + require.NoError(t, err) + written, err := io.Copy(f, f1) + require.NoError(t, err) + fmt.Println(written) + + b := newTestDiskBufferWithPath(t, filepath.Base(path), filepath.Dir(path)) + batch := b.Batch(4) + expected := []telegraf.Metric{ + Metric(), Metric(), Metric(), Metric(), + } + testutil.RequireMetricsEqual(t, expected, batch) +} + +/* +// Function used to create the test data used in the test above +func Test_CreateTestData(t *testing.T) { + metric.Init() + walfile, _ := wal.Open("testdata/testwal", nil) + data, err := metric.ToBytes(Metric()) + require.NoError(t, err) + require.NoError(t, walfile.Write(1, data)) + data, err = metric.ToBytes(Metric()) + require.NoError(t, err) + require.NoError(t, walfile.Write(2, data)) + data, err = metric.ToBytes(Metric()) + require.NoError(t, err) + require.NoError(t, walfile.Write(3, data)) + m, _ := metric.WithTracking(Metric(), func(di telegraf.DeliveryInfo) {}) + data, err = metric.ToBytes(m) + require.NoError(t, err) + require.NoError(t, walfile.Write(4, data)) + data, err = metric.ToBytes(Metric()) + require.NoError(t, err) + require.NoError(t, walfile.Write(5, data)) +} +*/ diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go index 12803184f4e94..eec7c8b39c01f 100644 --- a/models/buffer_mem_test.go +++ b/models/buffer_mem_test.go @@ -16,6 +16,21 @@ func newTestMemoryBuffer(t testing.TB, capacity int) Buffer { return buf } +func TestBuffer_AcceptCallsMetricAccept(t *testing.T) { + var accept int + mm := &MockMetric{ + Metric: Metric(), + AcceptF: func() { + accept++ + }, + } + b := newTestMemoryBuffer(t, 5) + b.Add(mm, mm, mm) + batch := b.Batch(2) + b.Accept(batch) + require.Equal(t, 2, accept) +} + func BenchmarkAddMetrics(b *testing.B) { buf := newTestMemoryBuffer(b, 10000) m := Metric() diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go index 19df313558a65..a984df41e1614 100644 --- a/models/buffer_suite_test.go +++ b/models/buffer_suite_test.go @@ -43,6 +43,11 @@ func (s *BufferSuiteTest) SetupTest() { switch s.bufferType { case "", "memory": s.hasMaxCapacity = true + case "disk": + path, err := os.MkdirTemp("", "*-buffer-test") + s.Require().NoError(err) + s.bufferPath = path + s.hasMaxCapacity = false } } @@ -57,6 +62,10 @@ func TestMemoryBufferSuite(t *testing.T) { suite.Run(t, &BufferSuiteTest{bufferType: "memory"}) } +func TestDiskBufferSuite(t *testing.T) { + suite.Run(t, &BufferSuiteTest{bufferType: "disk"}) +} + func Metric() telegraf.Metric { return MetricTime(0) } @@ -671,21 +680,6 @@ func (s *BufferSuiteTest) TestBuffer_BatchRejectAcceptNoop() { s.Equal(5, b.Len()) } -func (s *BufferSuiteTest) TestBuffer_AcceptCallsMetricAccept() { - var accept int - mm := &MockMetric{ - Metric: Metric(), - AcceptF: func() { - accept++ - }, - } - b := s.newTestBuffer(5) - b.Add(mm, mm, mm) - batch := b.Batch(2) - b.Accept(batch) - s.Equal(2, accept) -} - func (s *BufferSuiteTest) TestBuffer_AddCallsMetricRejectWhenNoBatch() { if !s.hasMaxCapacity { s.T().Skip("tested buffer does not have a maximum capacity") diff --git a/models/testdata/testwal/00000000000000000001 b/models/testdata/testwal/00000000000000000001 new file mode 100644 index 0000000000000000000000000000000000000000..a4d72e966edfb738eb6a137c512fdbd7eaf1437f GIT binary patch literal 1812 zcmaFCq+QR<$S6>pT9lcXlUbFT;+tAhl$p%5EV_@WgNV{dG=A-~c zx)3Tdfg1kzAo!I9sf;W@Tjl<@0$su%9jjH6nvW%UTAuhdnJPKe5EjgoB0QpQ{4{BR?ZhBVYN%??8%y;ol5U;Jp|P e>Cuo58D44n)o7X~HBDQPReCToj+P&EE2#k!ulq{? literal 0 HcmV?d00001