diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.c b/plugins/out_cloudwatch_logs/cloudwatch_api.c index 824099c1ef0..e9875875aae 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.c @@ -209,7 +209,7 @@ static int init_put_payload(struct flb_cloudwatch *ctx, struct cw_flush *buf, } if (!try_to_write(buf->out_buf, offset, buf->out_buf_size, - ctx->log_group, 0)) { + stream->group, 0)) { goto error; } @@ -484,22 +484,22 @@ int process_event(struct flb_cloudwatch *ctx, struct cw_flush *buf, } /* Resets or inits a cw_flush struct */ -void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf, - struct log_stream *stream) { +void reset_flush_buf(struct flb_cloudwatch *ctx, struct cw_flush *buf) { buf->event_index = 0; buf->tmp_buf_offset = 0; buf->event_index = 0; buf->data_size = PUT_LOG_EVENTS_HEADER_LEN + PUT_LOG_EVENTS_FOOTER_LEN; - buf->data_size += strlen(stream->name); - buf->data_size += strlen(ctx->log_group); - if (stream->sequence_token) { - buf->data_size += strlen(stream->sequence_token); + if (buf->current_stream != NULL) { + buf->data_size += strlen(buf->current_stream->name); + buf->data_size += strlen(buf->current_stream->group); + if (buf->current_stream->sequence_token) { + buf->data_size += strlen(buf->current_stream->sequence_token); + } } } /* sorts events, constructs a put payload, and then sends */ -int send_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, - struct log_stream *stream) { +int send_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf) { int ret; int offset; int i; @@ -513,11 +513,11 @@ int send_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, qsort(buf->events, buf->event_index, sizeof(struct cw_event), compare_events); retry: - stream->newest_event = 0; - stream->oldest_event = 0; + buf->current_stream->newest_event = 0; + buf->current_stream->oldest_event = 0; offset = 0; - ret = init_put_payload(ctx, buf, stream, &offset); + ret = init_put_payload(ctx, buf, buf->current_stream, &offset); if (ret < 0) { flb_plg_error(ctx->ins, "Failed to initialize PutLogEvents payload"); return -1; @@ -547,7 +547,7 @@ int send_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, } flb_plg_debug(ctx->ins, "cloudwatch:PutLogEvents: events=%d, payload=%d bytes", i, offset); - ret = put_log_events(ctx, buf, stream, (size_t) offset); + ret = put_log_events(ctx, buf, buf->current_stream, (size_t) offset); if (ret < 0) { flb_plg_error(ctx->ins, "Failed to send log events"); return -1; @@ -571,13 +571,20 @@ int add_event(struct flb_cloudwatch *ctx, struct cw_flush *buf, int retry_add = FLB_FALSE; int event_bytes = 0; - if (buf->event_index == 0) { - /* init */ - reset_flush_buf(ctx, buf, stream); + if (buf->event_index > 0 && buf->current_stream != stream) { + /* we already have events for a different stream, send them first */ + retry_add = FLB_TRUE; + goto send; } retry_add_event: + buf->current_stream = stream; retry_add = FLB_FALSE; + if (buf->event_index == 0) { + /* init */ + reset_flush_buf(ctx, buf); + } + ret = process_event(ctx, buf, obj, tms); if (ret < 0) { return -1; @@ -632,8 +639,8 @@ int add_event(struct flb_cloudwatch *ctx, struct cw_flush *buf, return 0; send: - ret = send_log_events(ctx, buf, stream); - reset_flush_buf(ctx, buf, stream); + ret = send_log_events(ctx, buf); + reset_flush_buf(ctx, buf); if (ret < 0) { return -1; } @@ -797,7 +804,7 @@ int pack_emf_payload(struct flb_cloudwatch *ctx, * return value is the number of events processed */ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, - struct cw_flush *buf, struct log_stream *stream, + struct cw_flush *buf, flb_sds_t tag, const char *data, size_t bytes) { size_t off = 0; @@ -815,6 +822,8 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, /* msgpack::sbuffer is a simple buffer implementation. */ msgpack_sbuffer mp_sbuf; + struct log_stream *stream; + char *key_str = NULL; size_t key_str_size = 0; int j; @@ -859,6 +868,12 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, map = root.via.array.ptr[1]; map_size = map.via.map.size; + stream = get_log_stream(ctx, tag, map); + if (!stream) { + flb_plg_debug(ctx->ins, "Couldn't determine log group & stream for record with tag %s", tag); + goto error; + } + if (ctx->log_key) { key_str = NULL; key_str_size = 0; @@ -975,8 +990,8 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, msgpack_unpacked_destroy(&result); /* send any remaining events */ - ret = send_log_events(ctx, buf, stream); - reset_flush_buf(ctx, buf, stream); + ret = send_log_events(ctx, buf); + reset_flush_buf(ctx, buf); if (ret < 0) { return -1; } @@ -989,39 +1004,22 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, return -1; } - -struct log_stream *get_dynamic_log_stream(struct flb_cloudwatch *ctx, - const char *tag, int tag_len) +struct log_stream *get_or_create_log_stream(struct flb_cloudwatch *ctx, + flb_sds_t stream_name, + flb_sds_t group_name) { int ret; struct log_stream *new_stream; struct log_stream *stream; struct mk_list *tmp; struct mk_list *head; - flb_sds_t name = NULL; - flb_sds_t tmp_s = NULL; time_t now; - name = flb_sds_create(ctx->log_stream_prefix); - if (!name) { - flb_errno(); - return NULL; - } - - tmp_s = flb_sds_cat(name, tag, tag_len); - if (!tmp_s) { - flb_errno(); - flb_sds_destroy(name); - return NULL; - } - name = tmp_s; - /* check if the stream already exists */ now = time(NULL); mk_list_foreach_safe(head, tmp, &ctx->streams) { stream = mk_list_entry(head, struct log_stream, _head); - if (strcmp(name, stream->name) == 0) { - flb_sds_destroy(name); + if (strcmp(stream_name, stream->name) == 0 && strcmp(group_name, stream->group) == 0) { return stream; } else { @@ -1037,10 +1035,18 @@ struct log_stream *get_dynamic_log_stream(struct flb_cloudwatch *ctx, new_stream = flb_calloc(1, sizeof(struct log_stream)); if (!new_stream) { flb_errno(); - flb_sds_destroy(name); return NULL; } - new_stream->name = name; + new_stream->name = flb_sds_create(stream_name); + if (new_stream->name == NULL) { + flb_errno(); + return NULL; + } + new_stream->group = flb_sds_create(group_name); + if (new_stream->group == NULL) { + flb_errno(); + return NULL; + } ret = create_log_stream(ctx, new_stream, FLB_TRUE); if (ret < 0) { @@ -1053,29 +1059,78 @@ struct log_stream *get_dynamic_log_stream(struct flb_cloudwatch *ctx, return new_stream; } -struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, - const char *tag, int tag_len) +struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, flb_sds_t tag, + const msgpack_object map) { + flb_sds_t group_name = NULL; + flb_sds_t stream_name = NULL; + flb_sds_t tmp_s = NULL; + int free_group = FLB_FALSE; + int free_stream = FLB_FALSE; struct log_stream *stream; - int ret; - if (ctx->log_stream_name) { - stream = &ctx->stream; - if (ctx->stream_created == FLB_FALSE) { - ret = create_log_stream(ctx, stream, FLB_TRUE); - if (ret < 0) { + /* templates take priority */ + if (ctx->ra_stream) { + stream_name = flb_ra_translate_check(ctx->ra_stream, tag, flb_sds_len(tag), + map, NULL, FLB_TRUE); + } + + if (ctx->ra_group) { + group_name = flb_ra_translate_check(ctx->ra_group, tag, flb_sds_len(tag), + map, NULL, FLB_TRUE); + } + + if (stream_name == NULL) { + if (ctx->stream_name) { + stream_name = ctx->stream_name; + } else { + free_stream = FLB_TRUE; + /* use log_stream_prefix */ + stream_name = flb_sds_create(ctx->log_stream_prefix); + if (!stream_name) { + flb_errno(); + if (group_name) { + flb_sds_destroy(group_name); + } return NULL; } - stream->expiration = time(NULL) + FOUR_HOURS_IN_SECONDS; - ctx->stream_created = FLB_TRUE; + + tmp_s = flb_sds_cat(stream_name, tag, flb_sds_len(tag)); + if (!tmp_s) { + flb_errno(); + flb_sds_destroy(stream_name); + if (group_name) { + flb_sds_destroy(group_name); + } + return NULL; + } + stream_name = tmp_s; } - return stream; + } else { + free_stream = FLB_TRUE; } + + if (group_name == NULL) { + group_name = ctx->group_name; + } else { + free_group = FLB_TRUE; + } + + flb_plg_debug(ctx->ins, "Using stream=%s, group=%s", stream_name, group_name); - return get_dynamic_log_stream(ctx, tag, tag_len); + stream = get_or_create_log_stream(ctx, stream_name, group_name); + + if (free_group == FLB_TRUE) { + flb_sds_destroy(group_name); + } + if (free_stream == FLB_TRUE) { + flb_sds_destroy(stream_name); + } + return stream; } -static int set_log_group_retention(struct flb_cloudwatch *ctx) + +static int set_log_group_retention(struct flb_cloudwatch *ctx, struct log_stream *stream) { if (ctx->log_retention_days <= 0) { /* no need to set */ @@ -1088,9 +1143,9 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx) flb_sds_t tmp; flb_sds_t error; - flb_plg_info(ctx->ins, "Setting retention policy on log group %s to %dd", ctx->log_group, ctx->log_retention_days); + flb_plg_info(ctx->ins, "Setting retention policy on log group %s to %dd", stream->group, ctx->log_retention_days); - body = flb_sds_create_size(68 + strlen(ctx->log_group)); + body = flb_sds_create_size(68 + strlen(stream->group)); if (!body) { flb_sds_destroy(body); flb_errno(); @@ -1098,7 +1153,7 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx) } /* construct CreateLogGroup request body */ - tmp = flb_sds_printf(&body, "{\"logGroupName\":\"%s\",\"retentionInDays\":%d}", ctx->log_group, ctx->log_retention_days); + tmp = flb_sds_printf(&body, "{\"logGroupName\":\"%s\",\"retentionInDays\":%d}", stream->group, ctx->log_retention_days); if (!tmp) { flb_sds_destroy(body); flb_errno(); @@ -1152,7 +1207,7 @@ static int set_log_group_retention(struct flb_cloudwatch *ctx) return -1; } -int create_log_group(struct flb_cloudwatch *ctx) +int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream) { struct flb_http_client *c = NULL; struct flb_aws_client *cw_client; @@ -1161,9 +1216,9 @@ int create_log_group(struct flb_cloudwatch *ctx) flb_sds_t error; int ret; - flb_plg_info(ctx->ins, "Creating log group %s", ctx->log_group); + flb_plg_info(ctx->ins, "Creating log group %s", stream->group); - body = flb_sds_create_size(25 + strlen(ctx->log_group)); + body = flb_sds_create_size(25 + strlen(stream->group)); if (!body) { flb_sds_destroy(body); flb_errno(); @@ -1171,7 +1226,7 @@ int create_log_group(struct flb_cloudwatch *ctx) } /* construct CreateLogGroup request body */ - tmp = flb_sds_printf(&body, "{\"logGroupName\":\"%s\"}", ctx->log_group); + tmp = flb_sds_printf(&body, "{\"logGroupName\":\"%s\"}", stream->group); if (!tmp) { flb_sds_destroy(body); flb_errno(); @@ -1194,11 +1249,10 @@ int create_log_group(struct flb_cloudwatch *ctx) if (c->resp.status == 200) { /* success */ - flb_plg_info(ctx->ins, "Created log group %s", ctx->log_group); - ctx->group_created = FLB_TRUE; + flb_plg_info(ctx->ins, "Created log group %s", stream->group); flb_sds_destroy(body); flb_http_client_destroy(c); - ret = set_log_group_retention(ctx); + ret = set_log_group_retention(ctx, stream); return ret; } @@ -1208,17 +1262,16 @@ int create_log_group(struct flb_cloudwatch *ctx) if (error != NULL) { if (strcmp(error, ERR_CODE_ALREADY_EXISTS) == 0) { flb_plg_info(ctx->ins, "Log Group %s already exists", - ctx->log_group); - ctx->group_created = FLB_TRUE; + stream->group); flb_sds_destroy(body); flb_sds_destroy(error); flb_http_client_destroy(c); - ret = set_log_group_retention(ctx); + ret = set_log_group_retention(ctx, stream); return ret; } /* some other error occurred; notify user */ flb_aws_print_error(c->resp.payload, c->resp.payload_size, - "CreateLogGroup", ctx->ins); + "CreateLogGroup", ctx->ins); flb_sds_destroy(error); } else { @@ -1248,9 +1301,9 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int ret; flb_plg_info(ctx->ins, "Creating log stream %s in log group %s", - stream->name, ctx->log_group); + stream->name, stream->group); - body = flb_sds_create_size(50 + strlen(ctx->log_group) + + body = flb_sds_create_size(50 + strlen(stream->group) + strlen(stream->name)); if (!body) { flb_sds_destroy(body); @@ -1261,7 +1314,7 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, /* construct CreateLogStream request body */ tmp = flb_sds_printf(&body, "{\"logGroupName\":\"%s\",\"logStreamName\":\"%s\"}", - ctx->log_group, + stream->group, stream->name); if (!tmp) { flb_sds_destroy(body); @@ -1312,8 +1365,8 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, if (ctx->create_group == FLB_TRUE) { flb_plg_info(ctx->ins, "Log Group %s not found. Will attempt to create it.", - ctx->log_group); - ret = create_log_group(ctx); + stream->group); + ret = create_log_group(ctx, stream); if (ret < 0) { return -1; } else { @@ -1327,7 +1380,7 @@ int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, } } else { flb_plg_error(ctx->ins, "Log Group %s not found and `auto_create_group` disabled.", - ctx->log_group); + stream->group); } return -1; } @@ -1366,17 +1419,6 @@ int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, int num_headers = 1; int retry = FLB_TRUE; - buf->put_events_calls++; - - if (buf->put_events_calls >= 4) { - /* - * In normal execution, even under high throughput, 4+ calls per flush - * should be extremely rare. This is needed for edge cases basically. - */ - flb_plg_debug(ctx->ins, "Too many calls this flush, sleeping for 250 ms"); - usleep(250000); - } - flb_plg_debug(ctx->ins, "Sending log events to log stream %s", stream->name); /* stream is being used, update expiration */ diff --git a/plugins/out_cloudwatch_logs/cloudwatch_api.h b/plugins/out_cloudwatch_logs/cloudwatch_api.h index 128c15007a1..99919055bc0 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_api.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_api.h @@ -42,16 +42,16 @@ void cw_flush_destroy(struct cw_flush *buf); -int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, struct cw_flush *buf, - struct log_stream *stream, +int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin, + struct cw_flush *buf, flb_sds_t tag, const char *data, size_t bytes); int create_log_stream(struct flb_cloudwatch *ctx, struct log_stream *stream, int can_retry); -struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, - const char *tag, int tag_len); +struct log_stream *get_log_stream(struct flb_cloudwatch *ctx, flb_sds_t tag, + const msgpack_object map); int put_log_events(struct flb_cloudwatch *ctx, struct cw_flush *buf, struct log_stream *stream, size_t payload_size); -int create_log_group(struct flb_cloudwatch *ctx); +int create_log_group(struct flb_cloudwatch *ctx, struct log_stream *stream); int compare_events(const void *a_arg, const void *b_arg); #endif diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.c b/plugins/out_cloudwatch_logs/cloudwatch_logs.c index e238d43ad3d..f3898355eb2 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.c +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.c @@ -81,6 +81,11 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, tmp = flb_output_get_property("log_group_name", ins); if (tmp) { ctx->log_group = tmp; + ctx->group_name = flb_sds_create(tmp); + if (!ctx->group_name) { + flb_plg_error(ctx->ins, "Could not create log group context property"); + goto error; + } } else { flb_plg_error(ctx->ins, "'log_group_name' is a required field"); goto error; @@ -89,6 +94,11 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, tmp = flb_output_get_property("log_stream_name", ins); if (tmp) { ctx->log_stream_name = tmp; + ctx->stream_name = flb_sds_create(tmp); + if (!ctx->stream_name) { + flb_plg_error(ctx->ins, "Could not create log group context property"); + goto error; + } } tmp = flb_output_get_property("log_stream_prefix", ins); @@ -108,6 +118,24 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, goto error; } + tmp = flb_output_get_property("log_group_template", ins); + if (tmp) { + ctx->ra_group = flb_ra_create((char *) tmp, FLB_FALSE); + if (ctx->ra_group == NULL) { + flb_plg_error(ctx->ins, "Could not parse `log_group_template`"); + goto error; + } + } + + tmp = flb_output_get_property("log_stream_template", ins); + if (tmp) { + ctx->ra_stream = flb_ra_create((char *) tmp, FLB_FALSE); + if (ctx->ra_stream == NULL) { + flb_plg_error(ctx->ins, "Could not parse `log_stream_template`"); + goto error; + } + } + tmp = flb_output_get_property("log_format", ins); if (tmp) { ctx->log_format = tmp; @@ -184,8 +212,6 @@ static int cb_cloudwatch_init(struct flb_output_instance *ins, ctx->sts_endpoint = (char *) tmp; } - ctx->group_created = FLB_FALSE; - /* init log streams */ if (ctx->log_stream_name) { ctx->stream.name = flb_sds_create(ctx->log_stream_name); @@ -383,19 +409,10 @@ static void cb_cloudwatch_flush(struct flb_event_chunk *event_chunk, { struct flb_cloudwatch *ctx = out_context; int event_count; - struct log_stream *stream = NULL; (void) i_ins; (void) config; - ctx->buf->put_events_calls = 0; - - stream = get_log_stream(ctx, - event_chunk->tag, flb_sds_len(event_chunk->tag)); - if (!stream) { - FLB_OUTPUT_RETURN(FLB_RETRY); - } - - event_count = process_and_send(ctx, i_ins->p->name, ctx->buf, stream, + event_count = process_and_send(ctx, i_ins->p->name, ctx->buf, event_chunk->tag, event_chunk->data, event_chunk->size); if (event_count < 0) { flb_plg_error(ctx->ins, "Failed to send events"); @@ -447,6 +464,22 @@ void flb_cloudwatch_ctx_destroy(struct flb_cloudwatch *ctx) flb_free(ctx->endpoint); } + if (ctx->ra_group) { + flb_ra_destroy(ctx->ra_group); + } + + if (ctx->ra_stream) { + flb_ra_destroy(ctx->ra_stream); + } + + if (ctx->group_name) { + flb_sds_destroy(ctx->group_name); + } + + if (ctx->stream_name) { + flb_sds_destroy(ctx->stream_name); + } + if (ctx->log_stream_name) { if (ctx->stream.name) { flb_sds_destroy(ctx->stream.name); @@ -482,6 +515,9 @@ void log_stream_destroy(struct log_stream *stream) if (stream->sequence_token) { flb_sds_destroy(stream->sequence_token); } + if (stream->group) { + flb_sds_destroy(stream->group); + } flb_free(stream); } } @@ -513,6 +549,20 @@ static struct flb_config_map config_map[] = { " to form the stream name" }, + { + FLB_CONFIG_MAP_STR, "log_group_template", NULL, + 0, FLB_FALSE, 0, + "Template for CW Log Group name using record accessor syntax. " + "Plugin falls back to the log_group_name configured if needed." + }, + + { + FLB_CONFIG_MAP_STR, "log_stream_template", NULL, + 0, FLB_FALSE, 0, + "Template for CW Log Stream name using record accessor syntax. " + "Plugin falls back to the log_stream_name or log_stream_prefix configured if needed." + }, + { FLB_CONFIG_MAP_STR, "log_key", NULL, 0, FLB_FALSE, 0, diff --git a/plugins/out_cloudwatch_logs/cloudwatch_logs.h b/plugins/out_cloudwatch_logs/cloudwatch_logs.h index 80c00476e3e..86669dfa393 100644 --- a/plugins/out_cloudwatch_logs/cloudwatch_logs.h +++ b/plugins/out_cloudwatch_logs/cloudwatch_logs.h @@ -27,6 +27,9 @@ #include #include +#include +#include + /* buffers used for each flush */ struct cw_flush { /* temporary buffer for storing the serialized event messages */ @@ -52,16 +55,8 @@ struct cw_flush { char *event_buf; size_t event_buf_size; - /* - * According to the docs: - * PutLogEvents: 5 requests per second per log stream. - * Additional requests are throttled. This quota can't be changed. - * This plugin fast. A single flush might make more than 5 calls, - * Then fail, then retry, then be too fast again, on and on. - * I have seen this happen. - * So we throttle ourselves if more than 5 calls are made per flush - */ - int put_events_calls; + /* current log stream that we are sending records too */ + struct log_stream *current_stream; }; struct cw_event { @@ -74,6 +69,7 @@ struct cw_event { struct log_stream { flb_sds_t name; + flb_sds_t group; flb_sds_t sequence_token; /* * log streams in CloudWatch do not expire; but our internal representations @@ -122,18 +118,22 @@ struct flb_cloudwatch { /* Should the plugin create the log group */ int create_group; + flb_sds_t group_name; + flb_sds_t stream_name; + /* Should requests to AWS services be retried */ int retry_requests; /* If set to a number greater than zero, and newly create log group's retention policy is set to this many days. */ int log_retention_days; - /* has the log group successfully been created */ - int group_created; - /* must be freed on shutdown if custom_endpoint is not set */ char *endpoint; + /* templates */ + struct flb_record_accessor *ra_group; + struct flb_record_accessor *ra_stream; + /* if we're writing to a static log stream, we'll use this */ struct log_stream stream; int stream_created;