-
Notifications
You must be signed in to change notification settings - Fork 133
/
fluent_bit_init_process.go
388 lines (321 loc) · 12 KB
/
fluent_bit_init_process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
package main
import (
"encoding/json"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"reflect"
"regexp"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/arn"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/sirupsen/logrus"
)
// static paths
const (
s3FileDirectoryPath = "/init/fluent-bit-init-s3-files/"
mainConfigFile = "/init/fluent-bit-init.conf"
originalMainConfigFile = "/fluent-bit/etc/fluent-bit.conf"
invokeFile = "/init/invoke_fluent_bit.sh"
)
var (
// default Fluent Bit command
baseCommand = "exec /fluent-bit/bin/fluent-bit -e /fluent-bit/firehose.so -e /fluent-bit/cloudwatch.so -e /fluent-bit/kinesis.so"
// global s3 client and flag
s3Client *s3.S3
s3ClientCreated bool = false
// global ecs metadata region
metadataRegion string = ""
)
// HTTPClient interface
type HTTPClient interface {
Get(url string) (*http.Response, error)
}
// S3Downloader interface
type S3Downloader interface {
Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*s3manager.Downloader)) (int64, error)
}
// all values in the structure are empty strings by default
type ECSTaskMetadata struct {
AWS_REGION string `json:"AWSRegion"`
ECS_CLUSTER string `json:"Cluster"` // Cluster name
ECS_TASK_ARN string `json:"TaskARN"`
ECS_TASK_ID string `json:"TaskID"`
ECS_FAMILY string `json:"Family"`
ECS_LAUNCH_TYPE string `json:"LaunchType"` // Task launch type will be an empty string if container agent is under version 1.45.0
ECS_REVISION string `json:"Revision"` // Revision number
ECS_TASK_DEFINITION string `json:"TaskDefinition"` // TaskDefinition = "family:revision"
}
// get ECS Task Metadata via endpoint V4
func getECSTaskMetadata(httpClient HTTPClient) ECSTaskMetadata {
var metadata ECSTaskMetadata
ecsTaskMetadataEndpointV4 := os.Getenv("ECS_CONTAINER_METADATA_URI_V4")
if ecsTaskMetadataEndpointV4 == "" {
logrus.Warnln("[FluentBit Init Process] Unable to get ECS Metadata, ignore this warning if not running on ECS")
return metadata
}
res, err := httpClient.Get(ecsTaskMetadataEndpointV4 + "/task")
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to get ECS Metadata via HTTP Get: %s\n", err)
}
response, err := ioutil.ReadAll(res.Body)
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to read ECS Metadata from HTTP response: %s\n", err)
}
res.Body.Close()
err = json.Unmarshal(response, &metadata)
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to unmarshal ECS metadata: %s\n", err)
}
arn, err := arn.Parse(metadata.ECS_TASK_ARN)
if err != nil {
logrus.Fatalf("[FluentBit Init Process] Failed to parse ECS TaskARN: %s\n", err)
}
resourceID := strings.Split(arn.Resource, "/")
taskID := resourceID[len(resourceID)-1]
metadata.ECS_TASK_ID = taskID
metadata.AWS_REGION = arn.Region
metadata.ECS_TASK_DEFINITION = metadata.ECS_FAMILY + ":" + metadata.ECS_REVISION
// set global ecs metadata region for S3 client
metadataRegion = reflect.ValueOf(metadata).Field(0).Interface().(string)
return metadata
}
// set ECS Task Metadata as environment variables in the invoke_fluent_bit.sh
func setECSTaskMetadata(metadata ECSTaskMetadata, filePath string) {
invokeFile := openFile(filePath)
defer invokeFile.Close()
// set the FLB_AWS_USER_AGENT env var as "init" to get the image usage
initUsage := "export FLB_AWS_USER_AGENT=ecs-init\n"
_, err := invokeFile.WriteString(initUsage)
if err != nil {
logrus.Errorln(err)
logrus.Warnf("[FluentBit Init Process] Cannot write %s in the invoke_fluent_bit.sh\n", initUsage[:len(initUsage)-2])
}
t := reflect.TypeOf(metadata)
v := reflect.ValueOf(metadata)
for i := 0; i < t.NumField(); i++ {
if v.Field(i).Interface().(string) == "" {
continue
}
writeContent := "export " + t.Field(i).Name + "=" + v.Field(i).Interface().(string) + "\n"
_, err := invokeFile.WriteString(writeContent)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot write %s in the invoke_fluent_bit.sh\n", writeContent[:len(writeContent)-2])
}
}
}
// create Fluent Bit command to use "-c" to specify the new main config file
func createCommand(command *string, filePath string) {
*command = *command + " -c " + filePath
}
// get our built in config files or files from s3
// process built-in config files directly
// add S3 config files to directory "/init/fluent-bit-init-s3-files/"
func getAllConfigFiles() {
// get all env vars in the container
envs := os.Environ()
// find all env vars match specified prefix
for _, env := range envs {
var envKey string
var envValue string
env_kv := strings.SplitN(env, "=", 2)
if len(env_kv) != 2 {
logrus.Fatalf("[FluentBit Init Process] Unrecognizable environment variables: %s\n", env)
}
envKey = string(env_kv[0])
envValue = string(env_kv[1])
s3_regex, _ := regexp.Compile("aws_fluent_bit_init_[sS]3")
file_regex, _ := regexp.Compile("aws_fluent_bit_init_[fF]ile")
matched_s3 := s3_regex.MatchString(envKey)
matched_file := file_regex.MatchString(envKey)
// if this env var's value is an arn, download the config file first, then process it
if matched_s3 {
s3FilePath := getS3ConfigFile(envValue)
s3FileName := strings.SplitN(s3FilePath, "/", -1)
processConfigFile(s3FileDirectoryPath + s3FileName[len(s3FileName)-1])
}
// if this env var's value is a path of our built-in config file, process is derectly
if matched_file {
processConfigFile(envValue)
}
}
}
func processConfigFile(path string) {
contentBytes, err := ioutil.ReadFile(path)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot open file: %s\n", path)
}
content := string(contentBytes)
if strings.Contains(content, "[PARSER]") {
// this is a parser config file, change command
updateCommand(path)
} else {
// this is not a parser config file. @INCLUDE
writeInclude(path, mainConfigFile)
}
}
func getS3ConfigFile(arn string) string {
// Preparation for downloading S3 config files
if !s3ClientCreated {
createS3Client()
}
// e.g. "arn:aws:s3:::user-bucket/s3_parser.conf"
arnBucketFile := arn[13:]
bucketAndFile := strings.SplitN(arnBucketFile, "/", 2)
if len(bucketAndFile) != 2 {
logrus.Fatalf("[FluentBit Init Process] Unrecognizable arn: %s\n", arn)
}
bucketName := bucketAndFile[0]
s3FilePath := bucketAndFile[1]
// get bucket region
input := &s3.GetBucketLocationInput{
Bucket: aws.String(bucketName),
}
output, err := s3Client.GetBucketLocation(input)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot get bucket region of %s + %s, you must be the bucket owner to implement this operation\n", bucketName, s3FilePath)
}
bucketRegion := aws.StringValue(output.LocationConstraint)
// Buckets in Region us-east-1 have a LocationConstraint of null
// https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#GetBucketLocationOutput
if bucketRegion == "" {
bucketRegion = "us-east-1"
}
// create a downloader
s3Downloader := createS3Downloader(bucketRegion)
// download file from S3 and store in the directory "/init/fluent-bit-init-s3-files/"
downloadS3ConfigFile(s3Downloader, s3FilePath, bucketName, s3FileDirectoryPath)
return s3FilePath
}
// create a S3 client as the global S3 client for reuse
func createS3Client() {
region := "us-east-1"
if metadataRegion != "" {
region = metadataRegion
}
s3Client = s3.New(session.Must(session.NewSession(&aws.Config{
// if not specify region here, missingregion error will raise when get bucket location
Region: aws.String(region),
})))
s3ClientCreated = true
}
func createS3Downloader(bucketRegion string) S3Downloader {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(bucketRegion)},
)
if err != nil {
logrus.Errorln(err)
logrus.Fatalln("[FluentBit Init Process] Cannot creat a new session")
}
// need to specify session region!
s3Downloader := s3manager.NewDownloader(sess)
return s3Downloader
}
func downloadS3ConfigFile(s3Downloader S3Downloader, s3FilePath, bucketName, s3FileDirectory string) {
s3FileName := strings.SplitN(s3FilePath, "/", -1)
fileFromS3 := createFile(s3FileDirectory+s3FileName[len(s3FileName)-1], false)
defer fileFromS3.Close()
_, err := s3Downloader.Download(fileFromS3,
&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(s3FilePath),
})
if err != nil {
logrus.Warnf("[FluentBit Init Process] Cannot download %s from s3, retrying...\n", s3FileName)
_, error := s3Downloader.Download(fileFromS3,
&s3.GetObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(s3FilePath),
})
if error != nil {
logrus.Errorln(error)
logrus.Fatalf("[FluentBit Init Process] Cannot download %s from s3\n", s3FileName)
}
}
}
// use @INCLUDE to add config files to the main config file
func writeInclude(configFilePath, mainConfigFilePath string) {
mainConfigFile := openFile(mainConfigFilePath)
defer mainConfigFile.Close()
writeContent := "@INCLUDE " + configFilePath + "\n"
_, err := mainConfigFile.WriteString(writeContent)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot write %s in main config file: %s\n", writeContent[:len(writeContent)-2], mainConfigFilePath)
}
}
// change the fluent bit cammand to use "-R" to specift Parser config file
func updateCommand(parserFilePath string) {
baseCommand = baseCommand + " -R " + parserFilePath
logrus.Infoln("[FluentBit Init Process] Command is change to -> " + baseCommand)
}
// change the invoke_fluent_bit.sh
// which will declare ECS Task Metadata as environment variables
// and finally invoke Fluent Bit
func modifyInvokeFile(filePath string) {
invokeFile := openFile(filePath)
defer invokeFile.Close()
_, err := invokeFile.WriteString(baseCommand)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot write %s in invoke_fluent_bit.sh\n", baseCommand)
}
}
// create a file, when flag is true, the file will be closed automatically after creation
func createFile(filePath string, AutoClose bool) *os.File {
if err := os.MkdirAll(filepath.Dir(filePath), 0700); err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot create the Directory: %s\n", filepath.Dir(filePath))
}
file, err := os.Create(filePath)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Cannot create the file: %s\n", filePath)
}
if AutoClose {
defer file.Close()
}
return file
}
func openFile(filePath string) *os.File {
file, err := os.OpenFile(filePath, os.O_APPEND|os.O_WRONLY, 0700)
if err != nil {
logrus.Errorln(err)
logrus.Fatalf("[FluentBit Init Process] Unable to read %s\n", filePath)
}
return file
}
func main() {
// create the invoke_fluent_bit.sh
// which will declare ECS Task Metadata as environment variables
// and finally invoke Fluent Bit
createFile(invokeFile, true)
// get ECS Task Metadata and set the region for S3 client
httpClient := &http.Client{}
metadata := getECSTaskMetadata(httpClient)
// set ECS Task Metada as env vars in the invoke_fluent_bit.sh
setECSTaskMetadata(metadata, invokeFile)
// create main config file which will be used invoke Fluent Bit
createFile(mainConfigFile, true)
// add @INCLUDE in main config file to include original main config file
writeInclude(originalMainConfigFile, mainConfigFile)
// create Fluent Bit command to use "-c" to specify new main config file
createCommand(&baseCommand, mainConfigFile)
// get our built in config files or files from s3
// process built-in config files directly
// add S3 config files to directory "/init/fluent-bit-init-s3-files/"
getAllConfigFiles()
// modify invoke_fluent_bit.sh, invoke fluent bit
// this function will be called at the end
// any error appear above will cause exit this process,
// will not write Fluent Bit command in the finvoke_fluent_bit.sh so Fluent Bit will not be invoked
modifyInvokeFile(invokeFile)
}