Skip to content

Commit

Permalink
out_opentelemetry: add support for gzip compression (#6232)
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Bezannier <[email protected]>
  • Loading branch information
flobz authored Nov 16, 2022
1 parent fe13b92 commit 6e0d840
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 3 deletions.
35 changes: 32 additions & 3 deletions plugins/out_opentelemetry/opentelemetry.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <fluent-otel-proto/fluent-otel.h>

#include <cmetrics/cmetrics.h>
#include <fluent-bit/flb_gzip.h>
#include <cmetrics/cmt_encode_opentelemetry.h>

#include <ctraces/ctraces.h>
Expand Down Expand Up @@ -140,6 +141,9 @@ static int http_post(struct opentelemetry_context *ctx,
struct flb_config_map_val *mv;
struct flb_slist_entry *key = NULL;
struct flb_slist_entry *val = NULL;
void *final_body = NULL;
size_t final_body_len = 0;
int compressed = FLB_FALSE;

/* Get upstream context and connection */
u = ctx->u;
Expand All @@ -149,10 +153,21 @@ static int http_post(struct opentelemetry_context *ctx,
u->tcp_host, u->tcp_port);
return FLB_RETRY;
}

if (ctx->compress_gzip == FLB_TRUE) {
ret = flb_gzip_compress((void *) body, body_len,
&final_body, &final_body_len);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression");
} else {
compressed = FLB_TRUE;
}
} else {
final_body = body;
final_body_len = body_len;
}
/* Create HTTP client context */
c = flb_http_client(u_conn, FLB_HTTP_POST, uri,
body, body_len,
final_body, final_body_len,
ctx->host, ctx->port,
ctx->proxy, 0);

Expand Down Expand Up @@ -192,7 +207,9 @@ static int http_post(struct opentelemetry_context *ctx,
key->str, flb_sds_len(key->str),
val->str, flb_sds_len(val->str));
}

if (compressed == FLB_TRUE) {
flb_http_set_content_encoding_gzip(c);
}
ret = flb_http_do(c, &b_sent);
if (ret == 0) {
/*
Expand Down Expand Up @@ -239,6 +256,13 @@ static int http_post(struct opentelemetry_context *ctx,
out_ret = FLB_RETRY;
}

/*
* If the payload buffer is different than incoming records in body, means
* we generated a different payload and must be freed.
*/
if (final_body != body) {
flb_free(final_body);
}
/* Destroy HTTP client context */
flb_http_client_destroy(c);

Expand Down Expand Up @@ -1022,6 +1046,11 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload),
"Specify if the response paylod should be logged or not"
},
{
FLB_CONFIG_MAP_STR, "compress", NULL,
0, FLB_FALSE, 0,
"Set payload compression mechanism. Option available is 'gzip'"
},
/* EOF */
{0}
};
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_opentelemetry/opentelemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ struct opentelemetry_context {

/* instance context */
struct flb_output_instance *ins;

/* Compression mode (gzip) */
int compress_gzip;
};

#endif
9 changes: 9 additions & 0 deletions plugins/out_opentelemetry/opentelemetry_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create(
char *logs_uri = NULL;
struct flb_upstream *upstream;
struct opentelemetry_context *ctx = NULL;
char *tmp = NULL;

/* Allocate plugin context */
ctx = flb_calloc(1, sizeof(struct opentelemetry_context));
Expand Down Expand Up @@ -232,6 +233,14 @@ struct opentelemetry_context *flb_opentelemetry_context_create(
/* Set instance flags into upstream */
flb_output_upstream_set(ctx->u, ins);

tmp = flb_output_get_property("compress", ins);
ctx->compress_gzip = FLB_FALSE;
if (tmp) {
if (strcasecmp(tmp, "gzip") == 0) {
ctx->compress_gzip = FLB_TRUE;
}
}

return ctx;
}

Expand Down

0 comments on commit 6e0d840

Please sign in to comment.