Skip to content

Commit

Permalink
feat: metric disk buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
DStrand1 committed Jul 10, 2024
1 parent 3a05c38 commit 3afd32d
Show file tree
Hide file tree
Showing 15 changed files with 498 additions and 42 deletions.
4 changes: 4 additions & 0 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type Accumulator interface {
// TrackingID uniquely identifies a tracked metric group
type TrackingID uint64

type TrackingData interface {
ID() TrackingID
}

// DeliveryInfo provides the results of a delivered metric group.
type DeliveryInfo interface {
// ID is the TrackingID
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ type AgentConfig struct {
// Number of attempts to obtain a remote configuration via a URL during
// startup. Set to -1 for unlimited attempts.
ConfigURLRetryAttempts int `toml:"config_url_retry_attempts"`

BufferStrategy string `toml:"buffer_strategy"`
BufferDirectory string `toml:"buffer_directory"`
}

// InputNames returns a list of strings of the configured inputs.
Expand Down Expand Up @@ -1521,6 +1524,8 @@ func (c *Config) buildOutput(name string, tbl *ast.Table) (*models.OutputConfig,
c.getFieldString(tbl, "name_suffix", &oc.NameSuffix)
c.getFieldString(tbl, "name_prefix", &oc.NamePrefix)
c.getFieldString(tbl, "startup_error_behavior", &oc.StartupErrorBehavior)
c.getFieldString(tbl, "buffer_strategy", &oc.BufferStrategy)
c.getFieldString(tbl, "buffer_directory", &oc.BufferDirectory)

if c.hasErrs() {
return nil, c.firstErr()
Expand Down
2 changes: 2 additions & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ following works:
- github.com/tidwall/gjson [MIT License](https://github.com/tidwall/gjson/blob/master/LICENSE)
- github.com/tidwall/match [MIT License](https://github.com/tidwall/match/blob/master/LICENSE)
- github.com/tidwall/pretty [MIT License](https://github.com/tidwall/pretty/blob/master/LICENSE)
- github.com/tidwall/tinylru [MIT License](https://github.com/tidwall/tinylru/blob/master/LICENSE)
- github.com/tidwall/wal [MIT License](https://github.com/tidwall/wal/blob/master/LICENSE)
- github.com/tinylib/msgp [MIT License](https://github.com/tinylib/msgp/blob/master/LICENSE)
- github.com/tklauser/go-sysconf [BSD 3-Clause "New" or "Revised" License](https://github.com/tklauser/go-sysconf/blob/master/LICENSE)
- github.com/tklauser/numcpus [Apache License 2.0](https://github.com/tklauser/numcpus/blob/master/LICENSE)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ require (
github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0
github.com/thomasklein94/packer-plugin-libvirt v0.5.0
github.com/tidwall/gjson v1.17.0
github.com/tidwall/wal v1.1.7
github.com/tinylib/msgp v1.2.0
github.com/urfave/cli/v2 v2.27.2
github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241
Expand Down Expand Up @@ -451,6 +452,7 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/tinylru v1.1.0 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/twmb/murmur3 v1.1.7 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,7 @@ github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0 h1:8B1u+sDwYhT
github.com/testcontainers/testcontainers-go/modules/kafka v0.31.0/go.mod h1:W1+yLUfUl8VLTzvmApP2FBHgCk8I5SKKjDWjxWEc33U=
github.com/thomasklein94/packer-plugin-libvirt v0.5.0 h1:aj2HLHZZM/ClGLIwVp9rrgh+2TOU/w4EiaZHAwCpOgs=
github.com/thomasklein94/packer-plugin-libvirt v0.5.0/go.mod h1:GwN82FQ6KxCNKtS8LNUgLbwTZs90GGhBzCmTNkrTCrY=
github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.17.0 h1:/Jocvlh98kcTfpN2+JzGQWQcqrPQwDrVEMApx/M5ZwM=
github.com/tidwall/gjson v1.17.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
Expand All @@ -2238,6 +2239,12 @@ github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tinylib/msgp v1.2.0 h1:0uKB/662twsVBpYUPbokj4sTSKhWFKB7LopO2kWK8lY=
github.com/tinylib/msgp v1.2.0/go.mod h1:2vIGs3lcUo8izAATNobrCHevYZC/LMsJtw4JPiYPHro=
github.com/tidwall/tinylru v1.1.0 h1:XY6IUfzVTU9rpwdhKUF6nQdChgCdGjkMfLzbWyiau6I=
github.com/tidwall/tinylru v1.1.0/go.mod h1:3+bX+TJ2baOLMWTnlyNWHh4QMnFyARg2TLTQ6OFbzw8=
github.com/tidwall/wal v1.1.7 h1:emc1TRjIVsdKKSnpwGBAcsAGg0767SvUk8+ygx7Bb+4=
github.com/tidwall/wal v1.1.7/go.mod h1:r6lR1j27W9EPalgHiB7zLJDYu3mzW5BQP5KrzBpYY/E=
github.com/tinylib/msgp v1.1.9 h1:SHf3yoO2sGA0veCJeCBYLHuttAVFHGm2RHgNodW7wQU=
github.com/tinylib/msgp v1.1.9/go.mod h1:BCXGB54lDD8qUEPmiG0cQQUANC4IUQyB2ItS2UDlO/k=
github.com/tj/assert v0.0.0-20171129193455-018094318fb0/go.mod h1:mZ9/Rh9oLWpLLDRpvE+3b7gP/C2YyLFYxNmcLnPTMe0=
github.com/tj/assert v0.0.3 h1:Df/BlaZ20mq6kuai7f5z2TvPFiwC3xaWJSDQNiIS3Rk=
github.com/tj/assert v0.0.3/go.mod h1:Ne6X72Q+TB1AteidzQncjw9PabbMp4PBMZ1k+vd1Pvk=
Expand Down
1 change: 1 addition & 0 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,6 @@ type UnwrappableMetric interface {
type TrackingMetric interface {
// TrackingID returns the ID used for tracking the metric
TrackingID() TrackingID
TrackingData() TrackingData
UnwrappableMetric
}
78 changes: 78 additions & 0 deletions metric/deserialize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package metric

import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"sync"

"github.com/influxdata/telegraf"
)

// storage for tracking data that can't be serialized to disk
var (
// todo need some way to empty this map out when done with a tracking ID.
// grouped tracking metrics means that ID->Data association is not one to one,
// many metrics could be associated with one tracking ID so we cannot just
// clear this every time in FromBytes.
trackingStore = make(map[telegraf.TrackingID]telegraf.TrackingData)
mu = sync.Mutex{}

// ErrSkipTracking indicates that tracking information could not be found after
// deserializing a metric from bytes. In this case we should skip the metric
// and continue as if it does not exist.
ErrSkipTracking = errors.New("metric tracking data not found")
)

type serializedMetric struct {
M telegraf.Metric
TID telegraf.TrackingID
}

func ToBytes(m telegraf.Metric) ([]byte, error) {
var sm serializedMetric
if um, ok := m.(telegraf.UnwrappableMetric); ok {
sm.M = um.Unwrap()
} else {
sm.M = m
}

if tm, ok := m.(telegraf.TrackingMetric); ok {
sm.TID = tm.TrackingID()

mu.Lock()
trackingStore[sm.TID] = tm.TrackingData()
mu.Unlock()
}

var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(&sm); err != nil {
return nil, fmt.Errorf("failed to encode metric to bytes: %w", err)
}
return buf.Bytes(), nil
}

func FromBytes(b []byte) (telegraf.Metric, error) {
buf := bytes.NewBuffer(b)
decoder := gob.NewDecoder(buf)

var sm *serializedMetric
if err := decoder.Decode(&sm); err != nil {
return nil, fmt.Errorf("failed to decode metric from bytes: %w", err)
}

m := sm.M
if sm.TID != 0 {
mu.Lock()
td := trackingStore[sm.TID]
mu.Unlock()

if td == nil {
return nil, ErrSkipTracking
}
m = rebuildTrackingMetric(m, td)
}
return m, nil
}
7 changes: 7 additions & 0 deletions metric/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package metric

import "encoding/gob"

func Init() {
gob.RegisterName("metric.metric", &metric{})
}
60 changes: 38 additions & 22 deletions metric/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,35 +33,36 @@ func newTrackingID() telegraf.TrackingID {
}

type trackingData struct {
id telegraf.TrackingID
rc int32
acceptCount int32
rejectCount int32
//nolint:revive // method is already named ID
Id telegraf.TrackingID
Rc int32
AcceptCount int32
RejectCount int32
notifyFunc NotifyFunc
}

func (d *trackingData) incr() {
atomic.AddInt32(&d.rc, 1)
atomic.AddInt32(&d.Rc, 1)
}

func (d *trackingData) decr() int32 {
return atomic.AddInt32(&d.rc, -1)
return atomic.AddInt32(&d.Rc, -1)
}

func (d *trackingData) accept() {
atomic.AddInt32(&d.acceptCount, 1)
atomic.AddInt32(&d.AcceptCount, 1)
}

func (d *trackingData) reject() {
atomic.AddInt32(&d.rejectCount, 1)
atomic.AddInt32(&d.RejectCount, 1)
}

func (d *trackingData) notify() {
d.notifyFunc(
&deliveryInfo{
id: d.id,
accepted: int(d.acceptCount),
rejected: int(d.rejectCount),
id: d.Id,
accepted: int(d.AcceptCount),
rejected: int(d.RejectCount),
},
)
}
Expand All @@ -75,26 +76,33 @@ func newTrackingMetric(metric telegraf.Metric, fn NotifyFunc) (telegraf.Metric,
m := &trackingMetric{
Metric: metric,
d: &trackingData{
id: newTrackingID(),
rc: 1,
acceptCount: 0,
rejectCount: 0,
Id: newTrackingID(),
Rc: 1,
AcceptCount: 0,
RejectCount: 0,
notifyFunc: fn,
},
}

if finalizer != nil {
runtime.SetFinalizer(m.d, finalizer)
}
return m, m.d.id
return m, m.d.Id
}

func rebuildTrackingMetric(metric telegraf.Metric, td telegraf.TrackingData) telegraf.Metric {
return &trackingMetric{
Metric: metric,
d: td.(*trackingData),
}
}

func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.Metric, telegraf.TrackingID) {
d := &trackingData{
id: newTrackingID(),
rc: 0,
acceptCount: 0,
rejectCount: 0,
Id: newTrackingID(),
Rc: 0,
AcceptCount: 0,
RejectCount: 0,
notifyFunc: fn,
}

Expand All @@ -114,7 +122,7 @@ func newTrackingMetricGroup(group []telegraf.Metric, fn NotifyFunc) ([]telegraf.
d.notify()
}

return group, d.id
return group, d.Id
}

func (m *trackingMetric) Copy() telegraf.Metric {
Expand Down Expand Up @@ -152,7 +160,11 @@ func (m *trackingMetric) decr() {

// Unwrap allows to access the underlying metric directly e.g. for go-templates
func (m *trackingMetric) TrackingID() telegraf.TrackingID {
return m.d.id
return m.d.Id
}

func (m *trackingMetric) TrackingData() telegraf.TrackingData {
return m.d
}

// Unwrap allows to access the underlying metric directly e.g. for go-templates
Expand All @@ -173,3 +185,7 @@ func (r *deliveryInfo) ID() telegraf.TrackingID {
func (r *deliveryInfo) Delivered() bool {
return r.rejected == 0
}

func (d *trackingData) ID() telegraf.TrackingID {
return d.Id
}
18 changes: 13 additions & 5 deletions models/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package models

import (
"fmt"
"sync"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/selfstat"
)

var (
AgentMetricsWritten = selfstat.Register("agent", "metrics_written", map[string]string{})
AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", map[string]string{})

registerGob = sync.OnceFunc(func() { metric.Init() })
)

type Buffer interface {
Expand Down Expand Up @@ -45,12 +49,16 @@ type BufferStats struct {
}

// NewBuffer returns a new empty Buffer with the given capacity.
func NewBuffer(name string, alias string, capacity int, strategy string, _ string) (Buffer, error) {
func NewBuffer(name string, alias string, capacity int, strategy string, path string) (Buffer, error) {
registerGob()

bs := NewBufferStats(name, alias, capacity)

switch strategy {
case "", "memory":
return NewMemoryBuffer(capacity, bs)
case "disk":
return NewDiskBuffer(name, path, bs)
}
return nil, fmt.Errorf("invalid buffer strategy %q", strategy)
}
Expand Down Expand Up @@ -97,14 +105,14 @@ func (b *BufferStats) metricAdded() {
b.MetricsAdded.Incr(1)
}

func (b *BufferStats) metricWritten(metric telegraf.Metric) {
func (b *BufferStats) metricWritten(m telegraf.Metric) {
AgentMetricsWritten.Incr(1)
b.MetricsWritten.Incr(1)
metric.Accept()
m.Accept()
}

func (b *BufferStats) metricDropped(metric telegraf.Metric) {
func (b *BufferStats) metricDropped(m telegraf.Metric) {
AgentMetricsDropped.Incr(1)
b.MetricsDropped.Incr(1)
metric.Reject()
m.Reject()
}
Loading

0 comments on commit 3afd32d

Please sign in to comment.