-
Notifications
You must be signed in to change notification settings - Fork 2
/
plugin.go
117 lines (99 loc) · 2.63 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Package plugin implements the global context and objects required to run an instance of a plugin
// also, the interfaces for input and output plugins.
package plugin
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/calyptia/plugin/metric"
)
// atomicUint32 is used to atomically check if the plugin has been registered.
var atomicUint32 uint32
var (
theName string
theDesc string
theInput InputPlugin
theOutput OutputPlugin
)
var (
registerWG sync.WaitGroup
initWG sync.WaitGroup
runCtx context.Context
runCancel context.CancelFunc
theChannel chan Message
)
func init() {
registerWG.Add(1)
theChannel = nil
}
type Fluentbit struct {
Conf ConfigLoader
Metrics Metrics
Logger Logger
}
// InputPlugin interface to represent an input fluent-bit plugin.
type InputPlugin interface {
Init(ctx context.Context, fbit *Fluentbit) error
Collect(ctx context.Context, ch chan<- Message) error
}
// OutputPlugin interface to represent an output fluent-bit plugin.
type OutputPlugin interface {
Init(ctx context.Context, fbit *Fluentbit) error
Flush(ctx context.Context, ch <-chan Message) error
}
// ConfigLoader interface to represent a fluent-bit configuration loader.
type ConfigLoader interface {
String(key string) string
}
// Logger interface to represent a fluent-bit logging mechanism.
type Logger interface {
Error(format string, a ...any)
Warn(format string, a ...any)
Info(format string, a ...any)
Debug(format string, a ...any)
}
// Metrics builder.
type Metrics interface {
NewCounter(name, desc string, labelValues ...string) metric.Counter
NewGauge(name, desc string, labelValues ...string) metric.Gauge
}
// Message struct to store a fluent-bit message this is collected (input) or flushed (output)
// from a plugin implementation.
type Message struct {
Time time.Time
// Record should be a map or a struct.
Record any
tag *string
}
// Tag is available at output.
func (m Message) Tag() string {
if m.tag == nil {
return ""
}
return *m.tag
}
// mustOnce allows to be called only once otherwise it panics.
// This is used to register a single plugin per file.
func mustOnce() {
if atomic.LoadUint32(&atomicUint32) == 1 {
panic("plugin already registered")
}
atomic.StoreUint32(&atomicUint32, 1)
}
// RegisterInput plugin.
// This function must be called only once per file.
func RegisterInput(name, desc string, in InputPlugin) {
mustOnce()
theName = name
theDesc = desc
theInput = in
}
// RegisterOutput plugin.
// This function must be called only once per file.
func RegisterOutput(name, desc string, out OutputPlugin) {
mustOnce()
theName = name
theDesc = desc
theOutput = out
}