Skip to content

Commit

Permalink
stackdriver: Support writing to textPayload field of Cloud Logging Lo…
Browse files Browse the repository at this point in the history
…gEntry. (fluent#8850)

Write payload to textPayload field of LogEntry if the text_payload_key
is string format and the only field after stripping special fields.

Signed-off-by: shuaichen <[email protected]>
  • Loading branch information
shuaich authored May 24, 2024
1 parent bf1a7e5 commit c4d6803
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 35 deletions.
102 changes: 68 additions & 34 deletions plugins/out_stackdriver/stackdriver.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ static flb_sds_t get_google_token(struct flb_stackdriver *ctx)
if (time(NULL) >= cached_expiration) {
return output;
} else {
/*
/*
* Cached token is expired. Wait on lock to use up-to-date token
* by either waiting for it to be refreshed or refresh it ourselves.
*/
Expand Down Expand Up @@ -1068,7 +1068,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx,
if (rval != NULL && rval->o.type == MSGPACK_OBJECT_STR) {
flb_mp_map_header_append(mh);
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
msgpack_pack_str_body(mp_pck, label_kv->key,
msgpack_pack_str_body(mp_pck, label_kv->key,
flb_sds_len(label_kv->key));
msgpack_pack_str(mp_pck, flb_sds_len(rval->val.string));
msgpack_pack_str_body(mp_pck, rval->val.string,
Expand All @@ -1082,7 +1082,7 @@ static int pack_resource_labels(struct flb_stackdriver *ctx,
} else {
flb_mp_map_header_append(mh);
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->key));
msgpack_pack_str_body(mp_pck, label_kv->key,
msgpack_pack_str_body(mp_pck, label_kv->key,
flb_sds_len(label_kv->key));
msgpack_pack_str(mp_pck, flb_sds_len(label_kv->val));
msgpack_pack_str_body(mp_pck, label_kv->val,
Expand Down Expand Up @@ -1284,7 +1284,7 @@ static int cb_stackdriver_init(struct flb_output_instance *ins,
return -1;
}

if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
if (ctx->resource_type != RESOURCE_TYPE_GENERIC_NODE
&& ctx->resource_type != RESOURCE_TYPE_GENERIC_TASK) {
ret = gce_metadata_read_zone(ctx);
if (ret == -1) {
Expand Down Expand Up @@ -1434,13 +1434,13 @@ static int get_trace_sampled(int * trace_sampled_value, const msgpack_object * s
{
msgpack_object tmp;
int ret = get_msgpack_obj(&tmp, src_obj, key, flb_sds_len(key), MSGPACK_OBJECT_BOOLEAN);

if (ret == 0 && tmp.via.boolean == true) {
*trace_sampled_value = FLB_TRUE;
return 0;
} else if (ret == 0 && tmp.via.boolean == false) {
*trace_sampled_value = FLB_FALSE;
return 0;
return 0;
}

return -1;
Expand Down Expand Up @@ -1476,15 +1476,16 @@ static insert_id_status validate_insert_id(msgpack_object * insert_id_value,
return ret;
}

static int pack_json_payload(int insert_id_extracted,
int operation_extracted, int operation_extra_size,
int source_location_extracted,
int source_location_extra_size,
int http_request_extracted,
int http_request_extra_size,
timestamp_status tms_status,
msgpack_packer *mp_pck, msgpack_object *obj,
struct flb_stackdriver *ctx)
static int pack_payload(int insert_id_extracted,
int operation_extracted,
int operation_extra_size,
int source_location_extracted,
int source_location_extra_size,
int http_request_extracted,
int http_request_extra_size,
timestamp_status tms_status,
msgpack_packer *mp_pck, msgpack_object *obj,
struct flb_stackdriver *ctx)
{
/* Specified fields include local_resource_id, operation, sourceLocation ... */
int i, j;
Expand All @@ -1495,10 +1496,14 @@ static int pack_json_payload(int insert_id_extracted,
int len;
int len_to_be_removed;
int key_not_found;
int text_payload_len = 0;
int is_string_text_payload = FLB_FALSE;
int write_to_textpayload_field = FLB_FALSE;
flb_sds_t removed;
flb_sds_t monitored_resource_key;
flb_sds_t local_resource_id_key;
flb_sds_t stream;
flb_sds_t text_payload = NULL;
msgpack_object_kv *kv = obj->via.map.ptr;
msgpack_object_kv *const kvend = obj->via.map.ptr + obj->via.map.size;

Expand Down Expand Up @@ -1565,14 +1570,36 @@ static int pack_json_payload(int insert_id_extracted,

new_map_size = map_size - to_remove;

ret = msgpack_pack_map(mp_pck, new_map_size);
if (ret < 0) {
goto error;
if (ctx->text_payload_key && get_string(&text_payload, obj, ctx->text_payload_key) == 0) {
is_string_text_payload = FLB_TRUE;
}

/* write to textPayload if text_payload_key is the only residual string field*/
if ((new_map_size == 1) && is_string_text_payload) {
write_to_textpayload_field = FLB_TRUE;
}

if (write_to_textpayload_field) {
msgpack_pack_str(mp_pck, 11);
msgpack_pack_str_body(mp_pck, "textPayload", 11);

text_payload_len = flb_sds_len(text_payload);
msgpack_pack_str(mp_pck, text_payload_len);
msgpack_pack_str_body(mp_pck, text_payload, text_payload_len);
} else {
/* jsonPayload */
msgpack_pack_str(mp_pck, 11);
msgpack_pack_str_body(mp_pck, "jsonPayload", 11);

ret = msgpack_pack_map(mp_pck, new_map_size);
if (ret < 0) {
goto error;
}
}

/* points back to the beginning of map */
kv = obj->via.map.ptr;
for(; kv != kvend; ++kv ) {
for(; kv != kvend; ++kv) {
key_not_found = 1;

/* processing logging.googleapis.com/insertId */
Expand Down Expand Up @@ -1639,7 +1666,8 @@ static int pack_json_payload(int insert_id_extracted,
}
}

if (key_not_found) {
/* write residual log fields to jsonPayload */
if (key_not_found && !write_to_textpayload_field) {
ret = msgpack_pack_object(mp_pck, kv->key);
if (ret < 0) {
goto error;
Expand All @@ -1654,12 +1682,14 @@ static int pack_json_payload(int insert_id_extracted,
flb_sds_destroy(monitored_resource_key);
flb_sds_destroy(local_resource_id_key);
flb_sds_destroy(stream);
flb_sds_destroy(text_payload);
return 0;

error:
flb_sds_destroy(monitored_resource_key);
flb_sds_destroy(local_resource_id_key);
flb_sds_destroy(stream);
flb_sds_destroy(text_payload);
return ret;
}

Expand Down Expand Up @@ -1821,7 +1851,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
msgpack_pack_str_body(&mp_pck, "labels", 6);

ret = pack_resource_labels(ctx, &mh, &mp_pck, data, bytes);
if (ret != 0) {
if (ret != 0) {
if (ctx->resource_type == RESOURCE_TYPE_K8S) {
ret = extract_local_resource_id(data, bytes, ctx, tag);
if (ret != 0) {
Expand Down Expand Up @@ -2314,7 +2344,7 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
/* Extract httpRequest */
init_http_request(&http_request);
http_request_extra_size = 0;
http_request_extracted = extract_http_request(&http_request,
http_request_extracted = extract_http_request(&http_request,
ctx->http_request_key,
ctx->http_request_key_size,
obj, &http_request_extra_size);
Expand Down Expand Up @@ -2432,17 +2462,16 @@ static flb_sds_t stackdriver_format(struct flb_stackdriver *ctx,
flb_sds_destroy(source_location_function);
destroy_http_request(&http_request);

/* jsonPayload */
msgpack_pack_str(&mp_pck, 11);
msgpack_pack_str_body(&mp_pck, "jsonPayload", 11);
pack_json_payload(insert_id_extracted,
operation_extracted, operation_extra_size,
source_location_extracted,
source_location_extra_size,
http_request_extracted,
http_request_extra_size,
tms_status,
&mp_pck, obj, ctx);
/* both textPayload and jsonPayload are supported */
pack_payload(insert_id_extracted,
operation_extracted,
operation_extra_size,
source_location_extracted,
source_location_extra_size,
http_request_extracted,
http_request_extra_size,
tms_status,
&mp_pck, obj, ctx);

/* avoid modifying the original tag */
newtag = tag;
Expand Down Expand Up @@ -2594,7 +2623,7 @@ static void update_retry_metric(struct flb_stackdriver *ctx,
uint64_t ts,
int http_status)
{
char tmp[32];
char tmp[32];
char *name = (char *) flb_output_name(ctx->ins);

/* convert status to string format */
Expand Down Expand Up @@ -3154,6 +3183,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_stackdriver, resource_labels),
"Set the resource labels"
},
{
FLB_CONFIG_MAP_STR, "text_payload_key", (char *)NULL,
0, FLB_TRUE, offsetof(struct flb_stackdriver, text_payload_key),
"Set key for extracting text payload"
},
{
FLB_CONFIG_MAP_BOOL, "test_log_entry_format", "false",
0, FLB_TRUE, offsetof(struct flb_stackdriver, test_log_entry_format),
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_stackdriver/stackdriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ struct flb_stackdriver {
/* upstream context for metadata end-point */
struct flb_upstream *metadata_u;

/* the key to extract unstructured text payload from */
flb_sds_t text_payload_key;

#ifdef FLB_HAVE_METRICS
/* metrics */
struct cmt_counter *cmt_successful_requests;
Expand Down
26 changes: 26 additions & 0 deletions tests/runtime/data/stackdriver/stackdriver_test_payload.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#define STRING_TEXT_PAYLOAD "[" \
"1595349600," \
"{" \
"\"message\": \"The application errored out\"," \
"\"logging.googleapis.com/severity\": \"ERROR\"" \
"}]"

#define STRING_TEXT_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \
"1595349600," \
"{" \
"\"message\": \"The application errored out\"," \
"\"logging.googleapis.com/severity\": \"ERROR\"," \
"\"errorCode\": \"400\"" \
"}]"

#define NON_SCALAR_PAYLOAD_WITH_RESIDUAL_FIELDS "[" \
"1595349600," \
"{" \
"\"message\": " \
"{" \
"\"application_name\": \"my_application\"," \
"\"error_message\": \"The application errored out\"," \
"}," \
"\"logging.googleapis.com/severity\": \"ERROR\"," \
"\"errorCode\": \"400\"" \
"}]"
Loading

0 comments on commit c4d6803

Please sign in to comment.