Skip to content

Commit

Permalink
out_cloudwatch_logs: Only create log group if it does not already exi…
Browse files Browse the repository at this point in the history
…st to prevent throttling

Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley committed Feb 16, 2022
1 parent 46ea2bd commit 2860afe
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 12 deletions.
37 changes: 34 additions & 3 deletions plugins/out_cloudwatch_logs/cloudwatch_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@

#define ERR_CODE_ALREADY_EXISTS "ResourceAlreadyExistsException"
#define ERR_CODE_INVALID_SEQUENCE_TOKEN "InvalidSequenceTokenException"
#define ERR_CODE_NOT_FOUND "ResourceNotFoundException"


#define AMZN_REQUEST_ID_HEADER "x-amzn-RequestId"

Expand Down Expand Up @@ -1041,7 +1043,7 @@ struct log_stream *get_dynamic_log_stream(struct flb_cloudwatch *ctx,
}
new_stream->name = name;

ret = create_log_stream(ctx, new_stream);
ret = create_log_stream(ctx, new_stream, FLB_TRUE);
if (ret < 0) {
log_stream_destroy(new_stream);
return NULL;
Expand All @@ -1061,7 +1063,7 @@ struct log_stream *get_log_stream(struct flb_cloudwatch *ctx,
if (ctx->log_stream_name) {
stream = &ctx->stream;
if (ctx->stream_created == FLB_FALSE) {
ret = create_log_stream(ctx, stream);
ret = create_log_stream(ctx, stream, FLB_TRUE);
if (ret < 0) {
return NULL;
}
Expand Down Expand Up @@ -1235,14 +1237,16 @@ int create_log_group(struct flb_cloudwatch *ctx)
return -1;
}

int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream)
int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream,
int can_retry)
{

struct flb_http_client *c = NULL;
struct flb_aws_client *cw_client;
flb_sds_t body;
flb_sds_t tmp;
flb_sds_t error;
int ret;

flb_plg_info(ctx->ins, "Creating log stream %s in log group %s",
stream->name, ctx->log_group);
Expand Down Expand Up @@ -1301,6 +1305,33 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream)
flb_http_client_destroy(c);
return 0;
}

if (strcmp(error, ERR_CODE_NOT_FOUND) == 0) {
flb_sds_destroy(body);
flb_sds_destroy(error);
flb_http_client_destroy(c);

if (ctx->create_group == FLB_TRUE) {
flb_plg_info(ctx->ins, "Log Group %s not found. Will attempt to create it.",
ctx->log_group);
ret = create_log_group(ctx);
if (ret < 0) {
return -1;
} else {
if (can_retry == FLB_TRUE) {
/* retry stream creation */
return create_log_stream(ctx, stream, FLB_FALSE);
} else {
/* we failed to create the stream */
return -1;
}
}
} else {
flb_plg_error(ctx->ins, "Log Group %s not found and `auto_create_group` disabled.",
ctx->log_group);
}
return -1;
}
/* some other error occurred; notify user */
flb_aws_print_error(c->resp.payload, c->resp.payload_size,
"CreateLogStream", ctx->ins);
Expand Down
2 changes: 1 addition & 1 deletion plugins/out_cloudwatch_logs/cloudwatch_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void cw_flush_destroy(struct cw_flush *buf);
int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, struct cw_flush *buf,
struct log_stream *stream,
const char *data, size_t bytes);
int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream);
int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int can_retry);
struct log_stream *get_log_stream(struct flb_cloudwatch *ctx,
const char *tag, int tag_len);
int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf,
Expand Down
10 changes: 2 additions & 8 deletions plugins/out_cloudwatch_logs/cloudwatch_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,8 @@ static void cb_cloudwatch_flush(const void *data, size_t bytes,

ctx->buf->put_events_calls = 0;

if (ctx->create_group == FLB_TRUE && ctx->group_created == FLB_FALSE) {
ret = create_log_group(ctx);
if (ret < 0) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
}

stream = get_log_stream(ctx, tag, tag_len);
stream = get_log_stream(ctx,
event_chunk->tag, flb_sds_len(event_chunk->tag));
if (!stream) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
Expand Down

0 comments on commit 2860afe

Please sign in to comment.