From 17f52e6ab7c68cc6f299635e9aa76aac2e7c2e94 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Fri, 1 Apr 2022 19:05:51 -0400 Subject: [PATCH] Add support for complex capture group names in parse_regex. --- confgenerator/logging_processors.go | 60 ++- .../golden_fluent_bit_main.conf | 95 ++++ .../golden_fluent_bit_parser.conf | 6 + .../golden_otel.conf | 445 ++++++++++++++++++ .../input.yaml | 26 + 5 files changed, 628 insertions(+), 4 deletions(-) create mode 100644 confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_fluent_bit_main.conf create mode 100644 confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_fluent_bit_parser.conf create mode 100644 confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_otel.conf create mode 100644 confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/input.yaml diff --git a/confgenerator/logging_processors.go b/confgenerator/logging_processors.go index 80f80f56cd..65fdcb1e02 100644 --- a/confgenerator/logging_processors.go +++ b/confgenerator/logging_processors.go @@ -17,6 +17,7 @@ package confgenerator import ( "fmt" "log" + "regexp" "strings" "github.com/GoogleCloudPlatform/ops-agent/confgenerator/filter" @@ -62,6 +63,52 @@ func init() { LoggingProcessorTypes.RegisterType(func() Component { return &LoggingProcessorParseJson{} }) } +// rewriteComplexCaptures translates disallowed capture group names into placeholders and a transformation to rename them back in the log record. +func rewriteComplexCaptures(regex, tag string) (string, []fluentbit.Component) { + // Short-circuit regexes that don't have disallowed capture group names + disallowed := regexp.MustCompile(`\(\?P<(?:[A-Za-z0-9_]*[^A-Za-z0-9_>])+(?:[A-Za-z0-9_]*)>`) + if !disallowed.MatchString(regex) { + return regex, nil + } + // Maintain a list of rewritten capture group names + var rewrites []string + captureGroup := regexp.MustCompile(`\(\?P<((?:[^>\\]|\\.)*)>`) + // Can't use ReplaceAllStringFunc, since it doesn't support replacing only captured values + groupIndexes := captureGroup.FindAllStringSubmatchIndex(regex, -1) + l := 0 + var r []string + for _, i := range groupIndexes { + g := regex[i[0]:i[1]] // Full match + s := regex[i[2]:i[3]] // First capture group + r = append(r, regex[l:i[2]]) + // Also replace any capture group whose name starts with "__" + if !disallowed.MatchString(g) && !strings.HasPrefix(s, "__") { + r = append(r, s) + } else { + rewrites = append(rewrites, s) + r = append(r, fmt.Sprintf("__%d", len(rewrites))) + } + l = i[3] + } + r = append(r, regex[l:]) + // Reconstruct the regex + regex = strings.Join(r, "") + // Rename all captured fields + oc := make([][2]string, len(rewrites)) + for i, g := range rewrites { + oc = append(oc, [2]string{"Rename", fmt.Sprintf("__%d %q", i+1, g)}) + } + rename := fluentbit.Component{ + Kind: "FILTER", + Config: map[string]string{ + "Match": tag, + "Name": "modify", + }, + OrderedConfig: oc, + } + return regex, []fluentbit.Component{rename} +} + // A LoggingProcessorParseRegex applies a regex to the specified field, storing the named capture groups as keys in the log record. // This was maintained in addition to the parse_regex_complex to ensure backward compatibility with any existing configurations type LoggingProcessorParseRegex struct { @@ -77,14 +124,16 @@ func (r LoggingProcessorParseRegex) Type() string { } func (p LoggingProcessorParseRegex) Components(tag, uid string) []fluentbit.Component { + regex, transforms := rewriteComplexCaptures(p.Regex, tag) + parser, parserName := p.ParserShared.Component(tag, uid) parser.Config["Format"] = "regex" - parser.Config["Regex"] = p.Regex + parser.Config["Regex"] = regex - return []fluentbit.Component{ + return append([]fluentbit.Component{ parser, fluentbit.ParserFilterComponent(tag, p.Field, []string{parserName}), - } + }, transforms...) } type RegexParser struct { @@ -103,10 +152,13 @@ func (p LoggingProcessorParseRegexComplex) Components(tag, uid string) []fluentb parserNames := []string{} for idx, parserConfig := range p.Parsers { + regex, transforms := rewriteComplexCaptures(parserConfig.Regex, tag) + parser, parserName := parserConfig.Parser.Component(tag, fmt.Sprintf("%s.%d", uid, idx)) parser.Config["Format"] = "regex" - parser.Config["Regex"] = parserConfig.Regex + parser.Config["Regex"] = regex components = append(components, parser) + components = append(components, transforms...) parserNames = append(parserNames, parserName) } diff --git a/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_fluent_bit_main.conf b/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_fluent_bit_main.conf new file mode 100644 index 0000000000..96653d38f6 --- /dev/null +++ b/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_fluent_bit_main.conf @@ -0,0 +1,95 @@ +@SET buffers_dir=/var/lib/google-cloud-ops-agent/fluent-bit/buffers +@SET logs_dir=/var/log/google-cloud-ops-agent/subagents + +[SERVICE] + Daemon off + Flush 1 + Log_Level info + dns.resolver legacy + storage.backlog.mem_limit 50M + storage.checksum on + storage.max_chunks_up 128 + storage.metrics on + storage.sync normal + +[INPUT] + Name fluentbit_metrics + Scrape_Interval 60 + Scrape_On_Start True + +[INPUT] + Buffer_Chunk_Size 512k + Buffer_Max_Size 5M + DB ${buffers_dir}/default_pipeline_syslog + Key message + Mem_Buf_Limit 10M + Name tail + Path /var/log/messages,/var/log/syslog + Read_from_Head True + Rotate_Wait 30 + Skip_Long_Lines On + Tag default_pipeline.syslog + storage.type filesystem + +[INPUT] + Buffer_Chunk_Size 512k + Buffer_Max_Size 5M + DB ${buffers_dir}/ops-agent-fluent-bit + Key message + Mem_Buf_Limit 10M + Name tail + Path ${logs_dir}/logging-module.log + Read_from_Head True + Rotate_Wait 30 + Skip_Long_Lines On + Tag ops-agent-fluent-bit + storage.type filesystem + +[FILTER] + Key_Name key_1 + Match default_pipeline.syslog + Name parser + Parser default_pipeline.syslog.0 + +[FILTER] + Match default_pipeline.syslog + Name modify + + + + Rename __1 "logging.googleapis.com/severity" + Rename __2 "z*%\\>\\\\!" + Rename __3 "__6" + +[FILTER] + Add logging.googleapis.com/logName syslog + Match default_pipeline.syslog + Name modify + +[OUTPUT] + Match_Regex ^(default_pipeline\.syslog)$ + Name stackdriver + Retry_Limit 3 + net.connect_timeout_log_error False + resource gce_instance + stackdriver_agent Google-Cloud-Ops-Agent-Logging/latest (BuildDistro=build_distro;Platform=linux;ShortName=linux_platform;ShortVersion=linux_platform_version) + tls On + tls.verify Off + workers 8 + +[OUTPUT] + Match_Regex ^(ops-agent-fluent-bit)$ + Name stackdriver + Retry_Limit 3 + net.connect_timeout_log_error False + resource gce_instance + stackdriver_agent Google-Cloud-Ops-Agent-Logging/latest (BuildDistro=build_distro;Platform=linux;ShortName=linux_platform;ShortVersion=linux_platform_version) + tls On + tls.verify Off + workers 8 + +[OUTPUT] + Match * + Name prometheus_exporter + host 0.0.0.0 + port 20202 diff --git a/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_fluent_bit_parser.conf b/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_fluent_bit_parser.conf new file mode 100644 index 0000000000..1138196439 --- /dev/null +++ b/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_fluent_bit_parser.conf @@ -0,0 +1,6 @@ +[PARSER] + Format regex + Name default_pipeline.syslog.0 + Regex ^(?P<__1>[EWID]) (?P<__2>.*) (?P<__3>.)$ + Time_Format time_format_1 + Time_Key time_key_1 diff --git a/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_otel.conf b/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_otel.conf new file mode 100644 index 0000000000..a15a4f54e5 --- /dev/null +++ b/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/golden_otel.conf @@ -0,0 +1,445 @@ +exporters: + googlecloud: + metric: + prefix: "" + user_agent: Google-Cloud-Ops-Agent-Metrics/latest (BuildDistro=build_distro;Platform=linux;ShortName=linux_platform;ShortVersion=linux_platform_version) +processors: + agentmetrics/default__pipeline_hostmetrics_0: + blank_label_metrics: + - system.cpu.utilization + filter/default__pipeline_hostmetrics_1: + metrics: + exclude: + match_type: strict + metric_names: + - system.cpu.time + - system.network.dropped + - system.filesystem.inodes.usage + - system.paging.faults + - system.disk.operation_time + filter/default__pipeline_hostmetrics_3: + metrics: + exclude: + match_type: regexp + metric_names: [] + filter/fluentbit_0: + metrics: + include: + match_type: strict + metric_names: + - fluentbit_uptime + filter/otel_0: + metrics: + include: + match_type: strict + metric_names: + - otelcol_process_uptime + - otelcol_process_memory_rss + - otelcol_grpc_io_client_completed_rpcs + - otelcol_googlecloudmonitoring_point_count + metricstransform/default__pipeline_hostmetrics_2: + transforms: + - action: update + include: system.cpu.time + new_name: cpu/usage_time + operations: + - action: toggle_scalar_data_type + - action: update_label + label: cpu + new_label: cpu_number + - action: update_label + label: state + new_label: cpu_state + - action: update + include: system.cpu.utilization + new_name: cpu/utilization + operations: + - action: aggregate_labels + aggregation_type: mean + label_set: + - state + - blank + - action: update_label + label: blank + new_label: cpu_number + - action: update_label + label: state + new_label: cpu_state + - action: update + include: system.cpu.load_average.1m + new_name: cpu/load_1m + - action: update + include: system.cpu.load_average.5m + new_name: cpu/load_5m + - action: update + include: system.cpu.load_average.15m + new_name: cpu/load_15m + - action: update + include: system.disk.read_io + new_name: disk/read_bytes_count + - action: update + include: system.disk.write_io + new_name: disk/write_bytes_count + - action: update + include: system.disk.operations + new_name: disk/operation_count + - action: update + include: system.disk.io_time + new_name: disk/io_time + operations: + - action: experimental_scale_value + experimental_scale: 1000.0 + - action: toggle_scalar_data_type + - action: update + include: system.disk.weighted_io_time + new_name: disk/weighted_io_time + operations: + - action: experimental_scale_value + experimental_scale: 1000.0 + - action: toggle_scalar_data_type + - action: update + include: system.disk.average_operation_time + new_name: disk/operation_time + operations: + - action: experimental_scale_value + experimental_scale: 1000.0 + - action: toggle_scalar_data_type + - action: update + include: system.disk.pending_operations + new_name: disk/pending_operations + operations: + - action: toggle_scalar_data_type + - action: update + include: system.disk.merged + new_name: disk/merged_operations + - action: update + include: system.filesystem.usage + new_name: disk/bytes_used + operations: + - action: toggle_scalar_data_type + - action: aggregate_labels + aggregation_type: max + label_set: + - device + - state + - action: update + include: system.filesystem.utilization + new_name: disk/percent_used + operations: + - action: aggregate_labels + aggregation_type: max + label_set: + - device + - state + - action: update + include: system.memory.usage + new_name: memory/bytes_used + operations: + - action: toggle_scalar_data_type + - action: aggregate_label_values + aggregated_values: + - slab_reclaimable + - slab_unreclaimable + aggregation_type: sum + label: state + new_value: slab + - action: update + include: system.memory.utilization + new_name: memory/percent_used + operations: + - action: aggregate_label_values + aggregated_values: + - slab_reclaimable + - slab_unreclaimable + aggregation_type: sum + label: state + new_value: slab + - action: update + include: system.network.io + new_name: interface/traffic + operations: + - action: update_label + label: interface + new_label: device + - action: update_label + label: direction + value_actions: + - new_value: rx + value: receive + - new_value: tx + value: transmit + - action: update + include: system.network.errors + new_name: interface/errors + operations: + - action: update_label + label: interface + new_label: device + - action: update_label + label: direction + value_actions: + - new_value: rx + value: receive + - new_value: tx + value: transmit + - action: update + include: system.network.packets + new_name: interface/packets + operations: + - action: update_label + label: interface + new_label: device + - action: update_label + label: direction + value_actions: + - new_value: rx + value: receive + - new_value: tx + value: transmit + - action: update + include: system.network.connections + new_name: network/tcp_connections + operations: + - action: toggle_scalar_data_type + - action: delete_label_value + label: protocol + label_value: udp + - action: update_label + label: state + new_label: tcp_state + - action: aggregate_labels + aggregation_type: sum + label_set: + - tcp_state + - action: add_label + new_label: port + new_value: all + - action: update + include: system.processes.created + new_name: processes/fork_count + - action: update + include: system.processes.count + new_name: processes/count_by_state + operations: + - action: toggle_scalar_data_type + - action: update_label + label: status + new_label: state + - action: update + include: system.paging.usage + new_name: swap/bytes_used + operations: + - action: toggle_scalar_data_type + - action: update + include: system.paging.utilization + new_name: swap/percent_used + - action: insert + include: swap/percent_used + new_name: pagefile/percent_used + operations: + - action: aggregate_labels + aggregation_type: sum + label_set: + - state + - action: update + include: system.paging.operations + new_name: swap/io + operations: + - action: aggregate_labels + aggregation_type: sum + label_set: + - direction + - action: update_label + label: direction + value_actions: + - new_value: in + value: page_in + - new_value: out + value: page_out + - action: update + include: process.cpu.time + new_name: processes/cpu_time + operations: + - action: experimental_scale_value + experimental_scale: 1e+06 + - action: toggle_scalar_data_type + - action: add_label + new_label: process + new_value: all + - action: delete_label_value + label: state + label_value: wait + - action: update_label + label: state + new_label: user_or_syst + - action: update_label + label: user_or_syst + value_actions: + - new_value: syst + value: system + - action: update + include: process.disk.read_io + new_name: processes/disk/read_bytes_count + operations: + - action: add_label + new_label: process + new_value: all + - action: update + include: process.disk.write_io + new_name: processes/disk/write_bytes_count + operations: + - action: add_label + new_label: process + new_value: all + - action: update + include: process.memory.physical_usage + new_name: processes/rss_usage + operations: + - action: toggle_scalar_data_type + - action: add_label + new_label: process + new_value: all + - action: update + include: process.memory.virtual_usage + new_name: processes/vm_usage + operations: + - action: toggle_scalar_data_type + - action: add_label + new_label: process + new_value: all + - action: update + include: ^(.*)$$ + match_type: regexp + new_name: agent.googleapis.com/$${1} + metricstransform/fluentbit_1: + transforms: + - action: update + include: fluentbit_uptime + new_name: agent/uptime + operations: + - action: toggle_scalar_data_type + - action: add_label + new_label: version + new_value: google-cloud-ops-agent-logging/latest-build_distro + - action: aggregate_labels + aggregation_type: sum + label_set: + - version + - action: update + include: ^(.*)$$ + match_type: regexp + new_name: agent.googleapis.com/$${1} + metricstransform/otel_1: + transforms: + - action: update + include: otelcol_process_uptime + new_name: agent/uptime + operations: + - action: toggle_scalar_data_type + - action: add_label + new_label: version + new_value: google-cloud-ops-agent-metrics/latest-build_distro + - action: aggregate_labels + aggregation_type: sum + label_set: + - version + - action: update + include: otelcol_process_memory_rss + new_name: agent/memory_usage + operations: + - action: aggregate_labels + aggregation_type: sum + label_set: [] + - action: update + include: otelcol_grpc_io_client_completed_rpcs + new_name: agent/api_request_count + operations: + - action: toggle_scalar_data_type + - action: update_label + label: grpc_client_status + new_label: state + - action: aggregate_labels + aggregation_type: sum + label_set: + - state + - action: update + include: otelcol_googlecloudmonitoring_point_count + new_name: agent/monitoring/point_count + operations: + - action: toggle_scalar_data_type + - action: aggregate_labels + aggregation_type: sum + label_set: + - status + - action: update + include: ^(.*)$$ + match_type: regexp + new_name: agent.googleapis.com/$${1} + resourcedetection/_global_0: + detectors: + - gce +receivers: + hostmetrics/default__pipeline_hostmetrics: + collection_interval: 60s + scrapers: + cpu: {} + disk: {} + filesystem: {} + load: {} + memory: {} + network: {} + paging: {} + process: + mute_process_name_error: true + processes: {} + prometheus/fluentbit: + config: + scrape_configs: + - job_name: logging-collector + metrics_path: /metrics + scrape_interval: 1m + static_configs: + - targets: + - 0.0.0.0:20202 + prometheus/otel: + config: + scrape_configs: + - job_name: otel-collector + scrape_interval: 1m + static_configs: + - targets: + - 0.0.0.0:20201 +service: + pipelines: + metrics/default__pipeline_hostmetrics: + exporters: + - googlecloud + processors: + - agentmetrics/default__pipeline_hostmetrics_0 + - filter/default__pipeline_hostmetrics_1 + - metricstransform/default__pipeline_hostmetrics_2 + - filter/default__pipeline_hostmetrics_3 + - resourcedetection/_global_0 + receivers: + - hostmetrics/default__pipeline_hostmetrics + metrics/fluentbit: + exporters: + - googlecloud + processors: + - filter/fluentbit_0 + - metricstransform/fluentbit_1 + - resourcedetection/_global_0 + receivers: + - prometheus/fluentbit + metrics/otel: + exporters: + - googlecloud + processors: + - filter/otel_0 + - metricstransform/otel_1 + - resourcedetection/_global_0 + receivers: + - prometheus/otel + telemetry: + metrics: + address: 0.0.0.0:20201 diff --git a/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/input.yaml b/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/input.yaml new file mode 100644 index 0000000000..4089ffc85f --- /dev/null +++ b/confgenerator/testdata/valid/linux/logging-processor_parse_regex_rewrite_capture/input.yaml @@ -0,0 +1,26 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +logging: + processors: + parse_regex_1: + type: parse_regex + field: key_1 + regex: ^(?P[EWID]) (?P\\!>.*) (?P<__6>.)$ + time_key: time_key_1 + time_format: time_format_1 + service: + pipelines: + default_pipeline: + processors: [parse_regex_1]