-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmain.go
112 lines (90 loc) · 2.23 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/last9/k8stream/io"
"gopkg.in/alecthomas/kingpin.v2"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
const VERSION = "0.0.5"
var (
configFile = kingpin.Flag("config", "Config File to Parse").Required().File()
)
func getFlusher(conf *L9K8streamConfig) (io.Flusher, error) {
return io.GetFlusher(&conf.Config)
}
func main() {
kingpin.Version(VERSION)
kingpin.Parse()
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
cData, err := io.ReadConfig(*configFile)
if err != nil {
log.Fatal(err)
}
conf := &L9K8streamConfig{}
if err := io.LoadConfig(cData, conf); err != nil {
log.Fatal(err)
}
if err := io.StartHeartbeat(
VERSION,
conf.UID, conf.HeartbeatHook,
conf.HeartbeatInterval, conf.HeartbeatTimeout,
); err != nil {
log.Fatal(err)
}
conf.Raw = cData
setDefaults(conf)
// Create a k8s client
kc, err := newK8sClient(conf.KubeConfig)
if err != nil {
log.Fatal(err)
}
// Create a LRU Cache
mcache, err := newCache()
if err != nil {
log.Fatal(err)
}
// Get Flusher instance from IO
f, err := getFlusher(conf)
if err != nil {
log.Fatal(err)
}
// Start a batcher, returns a channel.
ch := startIngester(f, conf, mcache)
h := &Handler{kc, ch, mcache, conf}
stopCh := make(chan struct{})
factory := informers.NewSharedInformerFactory(
kc.Clientset,
time.Duration(conf.ResyncInterval)*time.Second,
)
// Service Informer to capture service events, since they dont show up
// in the defaults events interface.
svcInformer := factory.Core().V1().Services().Informer()
svcInformer.AddEventHandler(h)
go svcInformer.Run(stopCh)
informer := factory.Core().V1().Events().Informer()
informer.AddEventHandler(h)
go informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
os.Exit(trapSignal(stopCh))
}
func trapSignal(stopCh chan<- struct{}) int {
sigCh := make(chan os.Signal, 0)
signal.Notify(sigCh, os.Kill, os.Interrupt, syscall.SIGQUIT)
s := <-sigCh
close(stopCh)
if s == syscall.SIGQUIT {
time.Sleep(300 * time.Millisecond)
return 1
}
return 0
}