From 6c1d286f472e39c057c490378a9745c89b1ffe02 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 17 Jul 2020 01:43:10 -0700 Subject: [PATCH 1/2] Add log_key to firehose plugin --- firehose/firehose.go | 18 ++++++++++++++++-- fluent-bit-firehose.go | 4 +++- go.sum | 1 + 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/firehose/firehose.go b/firehose/firehose.go index 1af01f0..e19f176 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -57,6 +57,7 @@ type OutputPlugin struct { dataKeys string timeKey string fmtStrftime *strftime.Strftime + logKey string client PutRecordBatcher records []*firehose.Record dataLength int @@ -65,7 +66,7 @@ type OutputPlugin struct { } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt string, pluginID int) (*OutputPlugin, error) { +func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey string, pluginID int) (*OutputPlugin, error) { client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint) if err != nil { return nil, err @@ -104,6 +105,7 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint timer: timer, timeKey: timeKey, fmtStrftime: timeFormatter, + logKey: logKey, PluginID: pluginID, }, nil } @@ -204,7 +206,19 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([ } var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(record) + var data []byte + + if output.logKey != "" { + log, err := plugins.LogKey(record, output.logKey) + if err != nil { + return nil, err + } + + data, err = plugins.EncodeLogKey(log) + } else { + data, err = json.Marshal(record) + } + if err != nil { logrus.Debugf("[firehose %d] Failed to marshal record: %v\n", output.PluginID, record) return nil, err diff --git a/fluent-bit-firehose.go b/fluent-bit-firehose.go index df74670..b4cbb30 100644 --- a/fluent-bit-firehose.go +++ b/fluent-bit-firehose.go @@ -74,12 +74,14 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey) timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format") logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt) + logKey := output.FLBPluginConfigKey(ctx, "log_key") + logrus.Infof("[firehose %d] plugin parameter log_key = '%s'\n", pluginID, logKey) if deliveryStream == "" || region == "" { return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID) } - return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, pluginID) + return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, pluginID) } //export FLBPluginInit diff --git a/go.sum b/go.sum index ba80388..776a9f3 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,7 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= From 54c3d7ded2b77da06b4ecb5fd992ee93a1eb3928 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 21 Jul 2020 10:27:36 -0700 Subject: [PATCH 2/2] Add log_key to readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 09dded1..765484d 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit: * `region`: The region which your Firehose delivery stream(s) is/are in. * `delivery_stream`: The name of the delivery stream that you want log records sent to. * `data_keys`: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then only those keys and values will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify `data_keys log` and only the log message will be sent to Kinesis. If you specify multiple keys, they should be comma delimited. +* `log_key`: By default, the whole log record will be sent to Firehose. If you specify a key name with this option, then only the value of that key will be sent to Firehose. For example, if you are using the Fluentd Docker log driver, you can specify `log_key log` and only the log message will be sent to Firehose. * `role_arn`: ARN of an IAM role to assume (for cross account access). * `endpoint`: Specify a custom endpoint for the Kinesis Firehose API. * `sts_endpoint`: Specify a custom endpoint for the STS API; used to assume your custom role provided with `role_arn`.