forked from tsaikd/gogstash
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinputbeats.go
127 lines (113 loc) · 3.08 KB
/
inputbeats.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package inputbeats
import (
"context"
"crypto/tls"
"fmt"
"net"
"github.com/elastic/go-lumber/server"
reuse "github.com/libp2p/go-reuseport"
codecjson "github.com/tsaikd/gogstash/codec/json"
"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/goglog"
"github.com/tsaikd/gogstash/config/logevent"
)
// ModuleName is the name used in config file
const ModuleName = "beats"
// InputConfig holds the configuration json fields and internal objects
type InputConfig struct {
config.InputConfig
// The IP address to listen on, defaults to "0.0.0.0"
Host string `json:"host"`
// The port to listen on.
Port int `json:"port"`
// Here we enable SO_REUSEPORT, see more information:
// https://github.com/libp2p/go-reuseport
ReusePort bool `json:"reuseport"`
// Enable ssl transport, defaults to false
SSL bool `json:"ssl"`
// SSL certificate to use.
SSLCertificate string `json:"ssl_certificate"`
// SSL key to use.
SSLKey string `json:"ssl_key"`
// SSL Verify, defaults to false
SSLVerify bool `json:"ssl_verify"`
tlsConfig *tls.Config
}
// DefaultInputConfig returns an InputConfig struct with default values
func DefaultInputConfig() InputConfig {
return InputConfig{
InputConfig: config.InputConfig{
CommonConfig: config.CommonConfig{
Type: ModuleName,
},
},
Host: "0.0.0.0",
}
}
// InitHandler initialize the input plugin
func InitHandler(ctx context.Context, raw *config.ConfigRaw) (config.TypeInputConfig, error) {
conf := DefaultInputConfig()
err := config.ReflectConfig(raw, &conf)
if err != nil {
return nil, err
}
if !conf.SSL {
if conf.SSLCertificate != "" {
goglog.Logger.Warn("beats input: SSL Certificate will not be used")
}
if conf.SSLKey != "" {
goglog.Logger.Warn("beats input: SSL Key will not be used")
}
} else {
// SSL enabled
cer, err := tls.LoadX509KeyPair(conf.SSLCertificate, conf.SSLKey)
if err != nil {
return nil, err
}
conf.tlsConfig = &tls.Config{Certificates: []tls.Certificate{cer}}
if !conf.SSLVerify {
conf.tlsConfig.InsecureSkipVerify = true
}
}
conf.Codec, err = config.GetCodecDefault(ctx, *raw, codecjson.ModuleName)
if err != nil {
return nil, err
}
return &conf, nil
}
// Start wraps the actual function starting the plugin
func (t *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEvent) (err error) {
addr := fmt.Sprintf("%s:%d", t.Host, t.Port)
s, err := server.ListenAndServeWith(func(network, addr string) (l net.Listener, err error) {
if t.ReusePort {
l, err = reuse.Listen(network, addr)
} else {
l, err = net.Listen(network, addr)
}
if err != nil {
return nil, err
}
if t.SSL {
l = tls.NewListener(l, t.tlsConfig)
}
return l, err
}, addr, server.JSONDecoder(t.Codec.DecodeEvent))
if err != nil {
return err
}
defer s.Close()
goglog.Logger.Infof("beats input: start listening on %s", addr)
for {
select {
case <-ctx.Done():
goglog.Logger.Info("input beats stopped")
return nil
case data := <-s.ReceiveChan():
for _, e := range data.Events {
msgChan <- e.(logevent.LogEvent)
}
data.ACK()
}
}
return nil
}