Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_cloudwatch_logs: bug fix: self throttle if too many calls per flush #2618

Merged
merged 1 commit into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include <msgpack.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>

#include "cloudwatch_api.h"

Expand Down Expand Up @@ -991,6 +992,17 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
flb_sds_t error;
int num_headers = 1;

buf->put_events_calls++;

if (buf->put_events_calls >= 4) {
/*
* In normal execution, even under high throughput, 4+ calls per flush
* should be extremely rare. This is needed for edge cases basically.
*/
flb_plg_debug(ctx->ins, "Too many calls this flush, sleeping for 250 ms");
usleep(250000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this function put_log_events is under a flush() invocation/coroutine, pls use flb_time_sleep(a, b) instead of usleep(). The proposed function does an async sleep (non-blocking)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhhh... I'm not sure this will work actually... this plugin is meant to be entirely synchronous- the code was written under that assumption. This is because the CW Logs API requires Puts to a stream to be synchronous. (Long term, if we introduce some more complicated concurrency mechanisms in Fluent Bit, we can make it concurrent).

A CW Logs Put request is 1 MB, and there's a few buffers the plugin needs each flush in order to process the logs and construct the payload. Since I knew the plugin was going to be synchronous, I just allocate these buffers in init one time and re-use them in every flush. I did some benchmarking and this made it way faster; allocating memory in each flush slowed things down. Anyway- this means that any introduction of concurrency could break the code.

I think I prefer to keep the code as is, because as stated in the comment, this is an edge/rare case. The synchronous sleep here should not be triggered very often in the real world.

}

flb_plg_debug(ctx->ins, "Sending log events to log stream %s", stream->name);

/* stream is being used, update expiration */
Expand Down
2 changes: 2 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,8 @@ static void cb_cloudwatch_flush(const void *data, size_t bytes,
(void) i_ins;
(void) config;

ctx->buf->put_events_calls = 0;

if (ctx->create_group == FLB_TRUE && ctx->group_created == FLB_FALSE) {
ret = create_log_group(ctx);
if (ret < 0) {
Expand Down
11 changes: 11 additions & 0 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ struct cw_flush {
/* buffer used to temporarily hold an event during processing */
char *event_buf;
size_t event_buf_size;

/*
* According to the docs:
* PutLogEvents: 5 requests per second per log stream.
* Additional requests are throttled. This quota can't be changed.
* This plugin fast. A single flush might make more than 5 calls,
* Then fail, then retry, then be too fast again, on and on.
* I have seen this happen.
* So we throttle ourselves if more than 5 calls are made per flush
*/
int put_events_calls;
};

struct cw_event {
Expand Down