diff --git a/src/flb_filter.c b/src/flb_filter.c index e5ea25d3186..99334aa22da 100644 --- a/src/flb_filter.c +++ b/src/flb_filter.c @@ -63,6 +63,7 @@ void flb_filter_do(struct flb_input_chunk *ic, int in_records = 0; int out_records = 0; int diff = 0; + int pre_records = 0; #endif char *ntag; const char *work_data; @@ -89,6 +90,12 @@ void flb_filter_do(struct flb_input_chunk *ic, work_data = (const char *) data; work_size = bytes; +#ifdef FLB_HAVE_METRICS + /* Count number of incoming records */ + in_records = ic->added_records; + pre_records = ic->total_records - in_records; +#endif + /* Iterate filters */ mk_list_foreach(head, &config->filters) { f_ins = mk_list_entry(head, struct flb_filter_instance, _head); @@ -108,11 +115,6 @@ void flb_filter_do(struct flb_input_chunk *ic, /* where to position the new content if modified ? */ write_at = (content_size - work_size); -#ifdef FLB_HAVE_METRICS - /* Count number of incoming records */ - in_records = flb_mp_count(work_data, work_size); -#endif - /* Invoke the filter callback */ ret = f_ins->p->cb_filter(work_data, /* msgpack buffer */ work_size, /* msgpack size */ @@ -131,11 +133,12 @@ void flb_filter_do(struct flb_input_chunk *ic, flb_input_chunk_write_at(ic, write_at, "", 0); #ifdef FLB_HAVE_METRICS + ic->total_records = pre_records; + /* Summarize all records removed */ flb_metrics_sum(FLB_METRIC_N_DROPPED, in_records, f_ins->metrics); #endif - break; } else { @@ -153,6 +156,10 @@ void flb_filter_do(struct flb_input_chunk *ic, flb_metrics_sum(FLB_METRIC_N_DROPPED, diff, f_ins->metrics); } + + /* set number of records in new chunk */ + in_records = out_records; + ic->total_records = pre_records + in_records; #endif } ret = flb_input_chunk_write_at(ic, write_at,