Skip to content

Commit

Permalink
input_chunk: update records in the right place
Browse files Browse the repository at this point in the history
In commit ac4ec1f, the input chunk writing was moved down from the
previous spot to be below where input metrics are update. This PR moves
the metric update below the input chunk write so that the `ret ==
CIO_OK` check is actually right, and so that the metrics are only
updated after the write has occurred.

I also moved adding to `total_records` outside of the metric ifdef. This
appears to have been here for some time, but there are numerous output
plugins that rely on `event_chunk->total_events`, which is set from this
total_records value. This will mean it always gets updated, even if
metrics are disabled.

Signed-off-by: braydonk <[email protected]>
  • Loading branch information
braydonk authored and edsiper committed Nov 28, 2023
1 parent 8733b25 commit 1a8dd39
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -1499,31 +1499,6 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
flb_chunk_trace_do_input(ic);
#endif /* FLB_HAVE_CHUNK_TRACE */

/* Update 'input' metrics */
#ifdef FLB_HAVE_METRICS
if (ret == CIO_OK) {
ic->added_records = n_records;
ic->total_records += n_records;
}

if (ic->total_records > 0) {
/* timestamp */
ts = cfl_time_now();

/* fluentbit_input_records_total */
cmt_counter_add(in->cmt_records, ts, ic->added_records,
1, (char *[]) {(char *) flb_input_name(in)});

/* fluentbit_input_bytes_total */
cmt_counter_add(in->cmt_bytes, ts, buf_size,
1, (char *[]) {(char *) flb_input_name(in)});

/* OLD api */
flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
}
#endif

filtered_data_buffer = NULL;
final_data_buffer = (char *) buf;
final_data_size = buf_size;
Expand Down Expand Up @@ -1555,6 +1530,31 @@ static int input_chunk_append_raw(struct flb_input_instance *in,
flb_free(filtered_data_buffer);
}

if (ret == CIO_OK) {
ic->added_records = n_records;
ic->total_records += n_records;
}

/* Update 'input' metrics */
#ifdef FLB_HAVE_METRICS
if (ic->total_records > 0) {
/* timestamp */
ts = cfl_time_now();

/* fluentbit_input_records_total */
cmt_counter_add(in->cmt_records, ts, ic->added_records,
1, (char *[]) {(char *) flb_input_name(in)});

/* fluentbit_input_bytes_total */
cmt_counter_add(in->cmt_bytes, ts, buf_size,
1, (char *[]) {(char *) flb_input_name(in)});

/* OLD api */
flb_metrics_sum(FLB_METRIC_N_RECORDS, ic->added_records, in->metrics);
flb_metrics_sum(FLB_METRIC_N_BYTES, buf_size, in->metrics);
}
#endif

if (ret == -1) {
flb_error("[input chunk] error writing data from %s instance",
in->name);
Expand Down

0 comments on commit 1a8dd39

Please sign in to comment.