-
Notifications
You must be signed in to change notification settings - Fork 4
/
loader.go
executable file
·121 lines (109 loc) · 3.42 KB
/
loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package go4data
import (
"context"
"errors"
"io/ioutil"
"github.com/percybolmer/go4data/property"
"github.com/percybolmer/go4data/register"
"gopkg.in/yaml.v3"
)
// Save takes care of storing a yaml config of data
func Save(path string, data interface{}) error {
output, err := yaml.Marshal(data)
if err != nil {
return err
}
return ioutil.WriteFile(path, output, 0644)
}
// Load will return a slice of processors loaded from a config
func Load(path string) ([]*Processor, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return nil, err
}
var procs []*LoaderProccessor
err = yaml.Unmarshal(data, &procs)
if err != nil {
return nil, err
}
var realproc []*Processor
for _, proc := range procs {
rp, err := proc.ConvertToProcessor()
if err != nil {
return nil, err
}
realproc = append(realproc, rp)
}
return realproc, nil
}
// LoaderProccessor is used to load/save processors
type LoaderProccessor struct {
// ID is a unique identifier for each processor,
ID uint `json:"id" yaml:"id"`
// Name is a user configured Name for a processor that can be used relatd to an Processor easier than an ID, cannot be duplicate tho
// It will be changed to be duplicates later, but for now PrometheusMetrics crashes.
Name string `json:"name" yaml:"name"`
// Running is a boolean indicator if the processor is currently Running
Running bool `json:"running" yaml:"running"`
// Workers is a int that represents how many concurrent handlers to run
Workers int `json:"workers" yaml:"workers"`
// Topics is the Topics to publish payload onto
Topics []string `json:"topics" yaml:"topics"`
// Subscriptions is the Topics to subscribe to
Subscriptions []string `json:"subscriptions" yaml:"subscriptions"`
// QueueSize is a integer of how many payloads are accepted on the Output channels to Subscribers
QueueSize int `json:"queuesize" yaml:"queuesize"`
// LoaderHandler is a Handler that can be loaded/saved
Handler LoaderHandler `json:"loaderhandler" yaml:"handler"`
}
// LoaderHandler is a Handler thats easier to save/load
type LoaderHandler struct {
Cfg *property.Configuration `json:"configs" yaml:"configs"`
Name string `json:"handler" yaml:"handler_name"`
}
// ConvertToProcessor is used to convert a Loader back into a Processor thats
// Runnable.
func (la *LoaderProccessor) ConvertToProcessor() (*Processor, error) {
// Load all Processor stuff, Topics etc etc
p := NewProcessor(la.Name, la.Topics...)
p.QueueSize = la.QueueSize
//Set default value for Workers to 1 if un configured
if la.Workers == 0 {
p.Workers = 1
} else {
p.Workers = la.Workers
}
// Get NewHandler from Register
handler, err := register.GetHandler(la.Handler.Name)
if err != nil {
return nil, err
}
p.Handler = handler
cfg := p.Handler.GetConfiguration()
// Apply Configs
for _, loadcfg := range la.Handler.Cfg.Properties {
err := cfg.SetProperty(loadcfg.Name, loadcfg.Value)
if err != nil {
return nil, err
}
}
worked, errs := p.Handler.ValidateConfiguration()
if !worked && errs != nil {
return nil, errors.New(errs[0])
}
// Resubscribe to Subscriptions
err = p.Subscribe(la.Subscriptions...)
if err != nil {
return nil, err
}
// Check if LA.Running is true, then start?
if la.Running {
err = p.Start(context.Background())
if err != nil {
return nil, err
}
}
// What FailureHandler has been applied? Does FH need a GetName aswelL?
// ANd maybe a RegisterFailureHandler?
return p, nil
}