Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an option to send multiple log events as a record #105

Merged
merged 1 commit into from
Mar 5, 2021

Conversation

DrewZhang13
Copy link
Contributor

@DrewZhang13 DrewZhang13 commented Mar 5, 2021

Signed-off-by: Drew Zhang [email protected]

Issue #: #12

Description of changes:
Add the option for allow multiple log events added in the same record if the maxRecordSizeLimit(1MiB) is not exceeded.

Running result:

[ec2-user@ip-172-31-43-210 bin]$ ./fluent-bit -e ./firehose.so -c ~/conf/fluent-bit.conf 
Fluent Bit v1.8.0
* Copyright (C) 2019-2021 The Fluent Bit Authors
* Copyright (C) 2015-2018 Treasure Data
* Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
* https://fluentbit.io

[2021/03/05 19:32:59] [ info] [engine] started (pid=3702)
[2021/03/05 19:32:59] [ info] [storage] version=1.1.1, initializing...
[2021/03/05 19:32:59] [ info] [storage] in-memory
[2021/03/05 19:32:59] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128
INFO[0000] A new higher performance Firehose plugin has been released; you are using the old plugin. Check out the new plugin's documentation and consider migrating.
https://docs.fluentbit.io/manual/pipeline/outputs/firehose 
INFO[0000] [firehose 0] plugin parameter delivery_stream = 'aws-fluent-bit-delivery' 
INFO[0000] [firehose 0] plugin parameter region = 'us-west-2' 
INFO[0000] [firehose 0] plugin parameter data_keys = '' 
INFO[0000] [firehose 0] plugin parameter role_arn = ''  
INFO[0000] [firehose 0] plugin parameter endpoint = ''  
INFO[0000] [firehose 0] plugin parameter sts_endpoint = '' 
INFO[0000] [firehose 0] plugin parameter time_key = ''  
INFO[0000] [firehose 0] plugin parameter time_key_format = '' 
INFO[0000] [firehose 0] plugin parameter log_key = ''   
INFO[0000] [firehose 0] plugin parameter replace_dots = '_' 
INFO[0000] [firehose 0] plugin parameter simple_aggregation = 'true' 
[2021/03/05 19:32:59] [ info] [sp] stream processor started

Unit Test:

[ec2-user@ip-172-31-43-210 amazon-kinesis-firehose-for-fluent-bit]$ make test
go test -timeout=120s -v -cover ./...
?   	github.com/aws/amazon-kinesis-firehose-for-fluent-bit	[no test files]
=== RUN   TestAddRecord
--- PASS: TestAddRecord (0.00s)
=== RUN   TestAddRecordWithMultipleLogEventsPerRecordEnable
--- PASS: TestAddRecordWithMultipleLogEventsPerRecordEnable (0.00s)
=== RUN   TestTruncateLargeLogEvent
time="2021-03-05T17:12:17Z" level=warning msg="[firehose 0] Found record with 6144045 bytes, truncating to 1000Kib, stream=stream\n"
time="2021-03-05T17:12:17Z" level=warning msg="[firehose 0] Found record with 6144045 bytes, truncating to 1000Kib, stream=stream\n"
--- PASS: TestTruncateLargeLogEvent (0.05s)
=== RUN   TestAddRecordAndFlush
--- PASS: TestAddRecordAndFlush (0.00s)
=== RUN   TestSendCurrentBatch
--- PASS: TestSendCurrentBatch (0.00s)
=== RUN   TestDotReplace
--- PASS: TestDotReplace (0.00s)
PASS
coverage: 33.6% of statements
ok  	github.com/aws/amazon-kinesis-firehose-for-fluent-bit/firehose	(cached)	coverage: 33.6% of statements
?   	github.com/aws/amazon-kinesis-firehose-for-fluent-bit/firehose/mock_firehose	[no test files]
=== RUN   TestDecodeMap
--- PASS: TestDecodeMap (0.00s)
=== RUN   TestDataKeys
--- PASS: TestDataKeys (0.00s)
=== RUN   TestGetBoolParam
--- PASS: TestGetBoolParam (0.00s)
PASS
coverage: 38.8% of statements
ok  	github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins	(cached)	coverage: 38.8% of statements

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@DrewZhang13 DrewZhang13 requested a review from a team as a code owner March 5, 2021 17:41
README.md Outdated
@@ -31,7 +31,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit:
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. You can also use `%L` for milliseconds and `%f` for microseconds. If you are using ECS FireLens, make sure you are running Amazon ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision.
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.

* `multiple_log_events_per_record`: Option to allow plugin send multiple log events in the same record if current record not exceed the maximumRecordSize(1MiB), default to be `false`, set to `true` to enable this option.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this option simple_aggregation. And then the description should note that it joins together as many log records as possible into a single Firehose record and delimits them with newline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also may be add that they should consider increasing the Fluent Bit flush interval in the service section to increase the number of logs sent per flush (to get better aggregation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering, why this can't be enabled by default. Why do we need a settings. Whats the complexity if we do this as defult behavior?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting it by default would break backwards compatibility with the old behavior...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALSO- this option is only for some customers/use cases. If the log events are all joined together into one record, that only works for some Firehose destinations like S3. For other like elasticsearch, concatenating events will break experiences.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something- why would it break the existing experience? All the data are still being send but in a larger chunk (multiple events in one record). Isn't it the implementation details how we are sending data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see. I missed your previous comment. Make sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to simple_aggregation everywhere.

README.md Outdated
@@ -31,7 +31,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit:
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. You can also use `%L` for milliseconds and `%f` for microseconds. If you are using ECS FireLens, make sure you are running Amazon ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision.
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.

* `simple_aggregation`: Option to allow plugin send multiple log events in the same record if current record not exceed the maximumRecordSize(1MiB). It joins together as many log records as possible into a single Firehose record and delimits them with newline. default to be `false`, set to `true` to enable this option.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a space here: maximumRecordSize(1MiB)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to mention, this option is good to have enabled if you are sending data to a destination which supports aggregation, like S3?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where should the space be added? comments looks like the same with code change. Do you mean (1 MiB)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I messed it up: maximumRecordSize (1 MiB)

Copy link
Contributor Author

@DrewZhang13 DrewZhang13 Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! updated, also mentioned the suggestion

Data: data,
})
if output.simpleAggregation && len(output.records) > 0 && len(output.records[len(output.records)-1].Data) + newDataSize <= maximumRecordSize {
output.records[len(output.records)-1].Data = append(output.records[len(output.records)-1].Data, data...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where are you adding the newline??

Copy link
Contributor Author

@DrewZhang13 DrewZhang13 Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this part, new line has been added in processRecord in this line as it used to and I didn't change this.
And the current output is like:

{"cpu0_p_cpu":0,"cpu0_p_system":0,"cpu0_p_user":0,"cpu1_p_cpu":0,"cpu1_p_system":0,"cpu1_p_user":0,"cpu2_p_cpu":0,"cpu2_p_system":0,"cpu2_p_user":0}
{"cpu0_p_cpu":0,"cpu0_p_system":0,"cpu0_p_user":0,"cpu1_p_cpu":0,"cpu1_p_system":0,"cpu1_p_user":0,"cpu2_p_cpu":0,"cpu2_p_system":0,"cpu2_p_user":0}

I wonder if this is correct output or a newline expected like this:

{"cpu0_p_cpu":0,"cpu0_p_system":0,"cpu0_p_user":0,"cpu1_p_cpu":0,"cpu1_p_system":0,"cpu1_p_user":0,"cpu2_p_cpu":0,"cpu2_p_system":0,"cpu2_p_user":0}

{"cpu0_p_cpu":0,"cpu0_p_system":0,"cpu0_p_user":0,"cpu1_p_cpu":0,"cpu1_p_system":0,"cpu1_p_user":0,"cpu2_p_cpu":0,"cpu2_p_system":0,"cpu2_p_user":0}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you're good. We don't want two newlines.

@hossain-rayhan
Copy link
Contributor

nit: maybe you can update the screenshot in the PR description as you changed the field name.

@DrewZhang13
Copy link
Contributor Author

nit: maybe you can update the screenshot in the PR description as you changed the field name.

Thanks! Update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants