Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add log_key to firehose plugin #33

Merged
merged 2 commits into from
Jul 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit:
* `region`: The region which your Firehose delivery stream(s) is/are in.
* `delivery_stream`: The name of the delivery stream that you want log records sent to.
* `data_keys`: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then only those keys and values will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify `data_keys log` and only the log message will be sent to Kinesis. If you specify multiple keys, they should be comma delimited.
* `log_key`: By default, the whole log record will be sent to Firehose. If you specify a key name with this option, then only the value of that key will be sent to Firehose. For example, if you are using the Fluentd Docker log driver, you can specify `log_key log` and only the log message will be sent to Firehose.
* `role_arn`: ARN of an IAM role to assume (for cross account access).
* `endpoint`: Specify a custom endpoint for the Kinesis Firehose API.
* `sts_endpoint`: Specify a custom endpoint for the STS API; used to assume your custom role provided with `role_arn`.
Expand Down
18 changes: 16 additions & 2 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type OutputPlugin struct {
dataKeys string
timeKey string
fmtStrftime *strftime.Strftime
logKey string
client PutRecordBatcher
records []*firehose.Record
dataLength int
Expand All @@ -65,7 +66,7 @@ type OutputPlugin struct {
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt string, pluginID int) (*OutputPlugin, error) {
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey string, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint)
if err != nil {
return nil, err
Expand Down Expand Up @@ -104,6 +105,7 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint
timer: timer,
timeKey: timeKey,
fmtStrftime: timeFormatter,
logKey: logKey,
PluginID: pluginID,
}, nil
}
Expand Down Expand Up @@ -204,7 +206,19 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([
}

var json = jsoniter.ConfigCompatibleWithStandardLibrary
data, err := json.Marshal(record)
var data []byte

if output.logKey != "" {
log, err := plugins.LogKey(record, output.logKey)
if err != nil {
return nil, err
}

data, err = plugins.EncodeLogKey(log)
} else {
data, err = json.Marshal(record)
}

if err != nil {
logrus.Debugf("[firehose %d] Failed to marshal record: %v\n", output.PluginID, record)
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion fluent-bit-firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin
logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey)
timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt)
logKey := output.FLBPluginConfigKey(ctx, "log_key")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the README file adding a line which describes the log_key field.

logrus.Infof("[firehose %d] plugin parameter log_key = '%s'\n", pluginID, logKey)

if deliveryStream == "" || region == "" {
return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID)
}

return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, pluginID)
return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, pluginID)
}

//export FLBPluginInit
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down