diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e7c2e882bf19..a00d331d57f6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -728,6 +728,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add write access to `url.value` from `request.transforms` in `httpjson` input. {pull}27937[27937] - Add Base64 encoded HMAC and UUID template functions to `httpjson` input {pull}27873[27873] - Release checkpoint module as GA. {pull}27814[27814] +- Move processing to ingest node for AWS vpcflow fileset. {pull}28168[28168] *Heartbeat* diff --git a/libbeat/tests/system/beat/beat.py b/libbeat/tests/system/beat/beat.py index 436b6e489d91..195741b1d317 100644 --- a/libbeat/tests/system/beat/beat.py +++ b/libbeat/tests/system/beat/beat.py @@ -741,14 +741,18 @@ def assert_explicit_ecs_version_set(self, module, fileset): Assert that the module explicitly sets the ECS version field. """ def get_config_paths(modules_path, module, fileset): - pathname = os.path.abspath(modules_path + - "/" + - module + - "/" + - fileset + - "/" + - "config/*.yml") - return glob.glob(pathname) + fileset_path = os.path.abspath(modules_path + + "/" + + module + + "/" + + fileset + + "/") + paths = [] + for x in ["config/*.yml", "ingest/*.yml", "ingest/*.json"]: + pathname = os.path.join(fileset_path, x) + paths.extend(glob.glob(pathname)) + + return paths def is_ecs_version_set(path): # parsing the yml file would be better but go templates in @@ -759,9 +763,7 @@ def is_ecs_version_set(path): return True return False - errors = [] for cfg_path in get_config_paths(self.modules_path, module, fileset): - if not is_ecs_version_set(cfg_path): - errors.append("{}".format(cfg_path)) - if len(errors) > 0: - raise Exception("{}/{} ecs.version not explicitly set in:\n{}".format(module, fileset, '\n'.join(errors))) + if is_ecs_version_set(cfg_path): + return + raise Exception("{}/{} ecs.version not explicitly set in config or pipeline".format(module, fileset)) diff --git a/x-pack/filebeat/module/aws/vpcflow/config/input.yml b/x-pack/filebeat/module/aws/vpcflow/config/input.yml index 12c57c500926..f5987c033d57 100644 --- a/x-pack/filebeat/module/aws/vpcflow/config/input.yml +++ b/x-pack/filebeat/module/aws/vpcflow/config/input.yml @@ -76,126 +76,3 @@ exclude_files: [".gz$"] {{ end }} tags: {{.tags | tojson}} publisher_pipeline.disable_host: {{ inList .tags "forwarded" }} - -processors: - - drop_event: - when.regexp.message: "^version" - - drop_event: - when.regexp.message: "^instance-id" - - - script: - lang: javascript - source: > - function process(event) { - var message = event.Get("message"); - var tokens = message.split(" ").length; - event.Put("@metadata.message_token_count", tokens); - } - - # Default vpc flow log format - - dissect: - when: - equals: - '@metadata.message_token_count': 14 - field: message - target_prefix: aws.vpcflow - tokenizer: '%{version} %{account_id} %{interface_id} %{srcaddr} %{dstaddr} %{srcport} %{dstport} %{protocol} %{packets} %{bytes} %{start} %{end} %{action} %{log_status}' - - # Custom flow log for traffic through a NAT gateway - - dissect: - when: - equals: - '@metadata.message_token_count': 6 - field: message - target_prefix: aws.vpcflow - tokenizer: '%{instance_id} %{interface_id} %{srcaddr} %{dstaddr} %{pkt_srcaddr} %{pkt_dstaddr}' - - # Custom flow log for traffic through a transit gateway - - dissect: - when: - equals: - '@metadata.message_token_count': 17 - field: message - target_prefix: aws.vpcflow - tokenizer: '%{version} %{interface_id} %{account_id} %{vpc_id} %{subnet_id} %{instance_id} %{srcaddr} %{dstaddr} %{srcport} %{dstport} %{protocol} %{tcp_flags} %{type} %{pkt_srcaddr} %{pkt_dstaddr} %{action} %{log_status}' - - # TCP Flag Sequence - - dissect: - when: - equals: - '@metadata.message_token_count': 21 - field: message - target_prefix: aws.vpcflow - tokenizer: '%{version} %{vpc_id} %{subnet_id} %{instance_id} %{interface_id} %{account_id} %{type} %{srcaddr} %{dstaddr} %{srcport} %{dstport} %{pkt_srcaddr} %{pkt_dstaddr} %{protocol} %{bytes} %{packets} %{start} %{end} %{action} %{tcp_flags} %{log_status}' - - - convert: - ignore_missing: true - fields: - - {from: aws.vpcflow.srcaddr, to: source.address} - - {from: aws.vpcflow.srcaddr, to: source.ip, type: ip} - - {from: aws.vpcflow.srcport, to: source.port, type: long} - - {from: aws.vpcflow.dstaddr, to: destination.address} - - {from: aws.vpcflow.dstaddr, to: destination.ip, type: ip} - - {from: aws.vpcflow.dstport, to: destination.port, type: long} - - {from: aws.vpcflow.protocol, to: network.iana_number, type: string} - - {from: aws.vpcflow.packets, to: source.packets, type: long} - - {from: aws.vpcflow.bytes, to: source.bytes, type: long} - - {from: aws.vpcflow.packets, to: network.packets, type: long} - - {from: aws.vpcflow.bytes, to: network.bytes, type: long} - - - drop_fields: - fields: ["aws.vpcflow.srcaddr", "aws.vpcflow.srcport", "aws.vpcflow.dstaddr", "aws.vpcflow.dstport", "aws.vpcflow.bytes", "aws.vpcflow.packets", "aws.vpcflow.protocol"] - - - community_id: ~ - - # Use the aws.vpcflow.action value to set the event.outcome value to either "allow" or "deny". - - add_fields: - when.equals.aws.vpcflow.action: ACCEPT - target: event - fields: {outcome: allow} - - add_fields: - when.equals.aws.vpcflow.action: REJECT - target: event - fields: {outcome: deny} - - - add_fields: - target: event - fields: {type: flow} - - add_fields: - target: event - fields: {category: network_traffic} - - # Add network.type: ipv4 or ipv6 - - if: - contains.source.ip: "." - then: - - add_fields: - target: network - fields: {type: ipv4} - - - if: - contains.source.ip: ":" - then: - - add_fields: - target: network - fields: {type: ipv6} - - # Add network.transport: based on IANA protocol number of the traffic - # http://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml - - if: - equals.network.iana_number: "6" - then: - - add_fields: - target: network - fields: {transport: tcp} - - if: - equals.network.iana_number: "17" - then: - - add_fields: - target: network - fields: {transport: udp} - - - add_fields: - target: '' - fields: - ecs.version: 1.11.0 diff --git a/x-pack/filebeat/module/aws/vpcflow/ingest/pipeline.yml b/x-pack/filebeat/module/aws/vpcflow/ingest/pipeline.yml index 2ce2d4a1ad71..660ce87ab13b 100644 --- a/x-pack/filebeat/module/aws/vpcflow/ingest/pipeline.yml +++ b/x-pack/filebeat/module/aws/vpcflow/ingest/pipeline.yml @@ -1,37 +1,76 @@ +--- description: Pipeline for AWS VPC Flow Logs processors: - set: field: event.ingested value: '{{_ingest.timestamp}}' + - set: + field: ecs.version + value: '1.11.0' + - rename: + field: message + target_field: event.original + ignore_missing: true + - set: + field: event.type + value: flow + - set: + field: event.category + value: network_traffic + - drop: + if: 'ctx.event?.original.startsWith("version") || ctx.event?.original.startsWith("instance-id")' + - script: + lang: painless + if: ctx.event?.original != null + source: >- + ctx._temp_ = new HashMap(); + ctx._temp_.message_token_count = ctx.event?.original.splitOnToken(" ").length; + - dissect: + field: event.original + pattern: '%{aws.vpcflow.version} %{aws.vpcflow.account_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.protocol} %{aws.vpcflow.packets} %{aws.vpcflow.bytes} %{aws.vpcflow.start} %{aws.vpcflow.end} %{aws.vpcflow.action} %{aws.vpcflow.log_status}' + if: ctx?._temp_?.message_token_count == 14 + - dissect: + field: event.original + pattern: '%{aws.vpcflow.instance_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr}' + if: ctx?._temp_?.message_token_count == 6 + - dissect: + field: event.original + pattern: '%{aws.vpcflow.version} %{aws.vpcflow.interface_id} %{aws.vpcflow.account_id} %{aws.vpcflow.vpc_id} %{aws.vpcflow.subnet_id} %{aws.vpcflow.instance_id} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.protocol} %{aws.vpcflow.tcp_flags} %{aws.vpcflow.type} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr} %{aws.vpcflow.action} %{aws.vpcflow.log_status}' + if: ctx?._temp_?.message_token_count == 17 + - dissect: + field: event.original + pattern: '%{aws.vpcflow.version} %{aws.vpcflow.vpc_id} %{aws.vpcflow.subnet_id} %{aws.vpcflow.instance_id} %{aws.vpcflow.interface_id} %{aws.vpcflow.account_id} %{aws.vpcflow.type} %{aws.vpcflow.srcaddr} %{aws.vpcflow.dstaddr} %{aws.vpcflow.srcport} %{aws.vpcflow.dstport} %{aws.vpcflow.pkt_srcaddr} %{aws.vpcflow.pkt_dstaddr} %{aws.vpcflow.protocol} %{aws.vpcflow.bytes} %{aws.vpcflow.packets} %{aws.vpcflow.start} %{aws.vpcflow.end} %{aws.vpcflow.action} %{aws.vpcflow.tcp_flags} %{aws.vpcflow.log_status}' + if: ctx?._temp_?.message_token_count == 21 # Convert Unix epoch to timestamp - date: - field: "aws.vpcflow.end" - target_field: "@timestamp" + field: aws.vpcflow.end + target_field: '@timestamp' ignore_failure: true formats: - UNIX - date: - field: "aws.vpcflow.start" - target_field: "event.start" + field: aws.vpcflow.start + target_field: event.start ignore_failure: true formats: - UNIX - date: - field: "aws.vpcflow.end" - target_field: "event.end" + field: aws.vpcflow.end + target_field: event.end ignore_failure: true formats: - UNIX - remove: - field: ["aws.vpcflow.start", "aws.vpcflow.end"] + field: + - aws.vpcflow.start + - aws.vpcflow.end ignore_missing: true - - script: lang: painless ignore_failure: true - if: ctx?.aws != null + if: ctx.aws != null source: >- void handleMap(Map map) { for (def x : map.values()) { @@ -53,7 +92,81 @@ processors: } } handleMap(ctx.aws); - + - set: + field: event.outcome + value: allow + if: ctx.aws?.vpcflow?.action == "ACCEPT" + - set: + field: event.outcome + value: deny + if: ctx.aws?.vpcflow?.action == "REJECT" + - rename: + field: aws.vpcflow.srcaddr + target_field: source.address + ignore_missing: true + - set: + field: source.ip + copy_from: source.address + if: ctx.source?.address != null + - convert: + field: aws.vpcflow.srcport + target_field: source.port + type: integer + ignore_missing: true + - rename: + field: aws.vpcflow.dstaddr + target_field: destination.address + ignore_missing: true + - set: + field: destination.ip + copy_from: destination.address + if: ctx.destination?.address != null + - convert: + field: aws.vpcflow.dstport + target_field: destination.port + type: integer + ignore_missing: true + - rename: + field: aws.vpcflow.protocol + target_field: network.iana_number + ignore_missing: true + - convert: + field: aws.vpcflow.packets + target_field: source.packets + type: long + ignore_missing: true + - convert: + field: aws.vpcflow.bytes + target_field: source.bytes + type: long + ignore_missing: true + - set: + field: network.bytes + copy_from: source.bytes + if: ctx.source?.bytes != null + - set: + field: network.packets + copy_from: source.packets + if: ctx.source?.packets != null + - set: + field: network.type + value: ipv4 + if: 'ctx.source?.ip != null && ctx.source?.ip.contains(".")' + - set: + field: network.type + value: ipv6 + if: 'ctx.source?.ip != null && ctx.source?.ip.contains(":")' + - set: + field: network.transport + value: tcp + if: ctx.network?.iana_number == "6" + - set: + field: network.transport + value: udp + if: ctx.network?.iana_number == "17" + - community_id: + target_field: network.community_id + ignore_failure: true # IP Geolocation Lookup - geoip: field: source.ip @@ -63,7 +176,6 @@ processors: field: destination.ip target_field: destination.geo ignore_missing: true - # IP Autonomous System (AS) Lookup - geoip: database_file: GeoLite2-ASN.mmdb @@ -81,7 +193,6 @@ processors: - asn - organization_name ignore_missing: true - - rename: field: source.as.asn target_field: source.as.number @@ -98,36 +209,25 @@ processors: field: destination.as.organization_name target_field: destination.as.organization.name ignore_missing: true - - rename: - field: message - target_field: event.original - ignore_missing: true - # Generate related.ip field - append: - if: ctx.source?.ip != null && ctx.destination?.ip != null + if: 'ctx.source?.ip != null && ctx.destination?.ip != null' field: related.ip value: ["{{source.ip}}", "{{destination.ip}}"] - - set: field: cloud.provider value: aws - - set: + if: ctx.aws?.vpcflow?.account_id != null field: cloud.account.id - value: "{{aws.vpcflow.account_id}}" - ignore_empty_value: true - + value: '{{aws.vpcflow.account_id}}' - set: - if: "ctx.aws?.vpcflow?.instance_id != null && ctx.aws.vpcflow.instance_id != '-'" + if: 'ctx?.aws?.vpcflow?.instance_id != null && ctx.aws.vpcflow.instance_id != "-"' field: cloud.instance.id - value: "{{aws.vpcflow.instance_id}}" - ignore_empty_value: true - + value: '{{aws.vpcflow.instance_id}}' - set: field: event.kind value: event - - script: lang: painless ignore_failure: true @@ -158,8 +258,23 @@ processors: if ((flags & 0x20) != 0) { ctx.aws.vpcflow.tcp_flags_array.add('urg'); } - + - remove: + field: + - _temp_ + - aws.vpcflow.srcaddr + - aws.vpcflow.srcport + - aws.vpcflow.dstaddr + - aws.vpcflow.dstport + - aws.vpcflow.bytes + - aws.vpcflow.packets + - aws.vpcflow.protocol + ignore_missing: true + - remove: + field: event.original + if: "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))" + ignore_failure: true + ignore_missing: true on_failure: - set: - field: "error.message" - value: "{{ _ingest.on_failure_message }}" + field: 'error.message' + value: '{{ _ingest.on_failure_message }}' diff --git a/x-pack/filebeat/module/aws/vpcflow/manifest.yml b/x-pack/filebeat/module/aws/vpcflow/manifest.yml index cd16451fcba7..87850096ed56 100644 --- a/x-pack/filebeat/module/aws/vpcflow/manifest.yml +++ b/x-pack/filebeat/module/aws/vpcflow/manifest.yml @@ -17,7 +17,7 @@ var: - name: session_token - name: role_arn - name: tags - default: [forwarded] + default: [forwarded, preserve_original_event] - name: fips_enabled - name: proxy_url - name: max_number_of_messages diff --git a/x-pack/filebeat/module/aws/vpcflow/test/accept-reject-traffic.log-expected.json b/x-pack/filebeat/module/aws/vpcflow/test/accept-reject-traffic.log-expected.json index 9e7aa834d8a6..dbc6ebb3150a 100644 --- a/x-pack/filebeat/module/aws/vpcflow/test/accept-reject-traffic.log-expected.json +++ b/x-pack/filebeat/module/aws/vpcflow/test/accept-reject-traffic.log-expected.json @@ -57,7 +57,8 @@ "source.packets": 20, "source.port": 20641, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] }, { @@ -118,7 +119,8 @@ "source.packets": 20, "source.port": 49761, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] }, { @@ -161,7 +163,8 @@ "source.packets": 4, "source.port": 0, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] }, { @@ -204,7 +207,8 @@ "source.packets": 4, "source.port": 0, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] } ] \ No newline at end of file diff --git a/x-pack/filebeat/module/aws/vpcflow/test/bad.log-expected.json b/x-pack/filebeat/module/aws/vpcflow/test/bad.log-expected.json index 534c05beba51..7e762a71c7ef 100644 --- a/x-pack/filebeat/module/aws/vpcflow/test/bad.log-expected.json +++ b/x-pack/filebeat/module/aws/vpcflow/test/bad.log-expected.json @@ -12,7 +12,8 @@ "log.offset": 0, "service.type": "aws", "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] } ] \ No newline at end of file diff --git a/x-pack/filebeat/module/aws/vpcflow/test/custom-nat-gateway.log-expected.json b/x-pack/filebeat/module/aws/vpcflow/test/custom-nat-gateway.log-expected.json index a5fbe695b279..11b49a3af565 100644 --- a/x-pack/filebeat/module/aws/vpcflow/test/custom-nat-gateway.log-expected.json +++ b/x-pack/filebeat/module/aws/vpcflow/test/custom-nat-gateway.log-expected.json @@ -24,7 +24,8 @@ "source.address": "10.0.1.5", "source.ip": "10.0.1.5", "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] }, { @@ -54,7 +55,8 @@ "source.address": "10.0.1.5", "source.ip": "10.0.1.5", "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] } ] \ No newline at end of file diff --git a/x-pack/filebeat/module/aws/vpcflow/test/custom-transit-gateway.log-expected.json b/x-pack/filebeat/module/aws/vpcflow/test/custom-transit-gateway.log-expected.json index f8b8a3a33574..c0258dd5f8e5 100644 --- a/x-pack/filebeat/module/aws/vpcflow/test/custom-transit-gateway.log-expected.json +++ b/x-pack/filebeat/module/aws/vpcflow/test/custom-transit-gateway.log-expected.json @@ -45,7 +45,8 @@ "source.ip": "10.20.33.164", "source.port": 39812, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] } ] \ No newline at end of file diff --git a/x-pack/filebeat/module/aws/vpcflow/test/ipv6.log-expected.json b/x-pack/filebeat/module/aws/vpcflow/test/ipv6.log-expected.json index ac0ead951e91..e487f326cf92 100644 --- a/x-pack/filebeat/module/aws/vpcflow/test/ipv6.log-expected.json +++ b/x-pack/filebeat/module/aws/vpcflow/test/ipv6.log-expected.json @@ -40,7 +40,8 @@ "source.packets": 54, "source.port": 34892, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] } ] \ No newline at end of file diff --git a/x-pack/filebeat/module/aws/vpcflow/test/no-data-skip-data.log-expected.json b/x-pack/filebeat/module/aws/vpcflow/test/no-data-skip-data.log-expected.json index e8224ee08b11..d055f270eccb 100644 --- a/x-pack/filebeat/module/aws/vpcflow/test/no-data-skip-data.log-expected.json +++ b/x-pack/filebeat/module/aws/vpcflow/test/no-data-skip-data.log-expected.json @@ -20,7 +20,8 @@ "log.offset": 0, "service.type": "aws", "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] }, { @@ -44,7 +45,8 @@ "log.offset": 82, "service.type": "aws", "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] } ] \ No newline at end of file diff --git a/x-pack/filebeat/module/aws/vpcflow/test/tcp-flag-sequence-skip-data.log-expected.json b/x-pack/filebeat/module/aws/vpcflow/test/tcp-flag-sequence-skip-data.log-expected.json index b28207021b6a..db3c56d1969d 100644 --- a/x-pack/filebeat/module/aws/vpcflow/test/tcp-flag-sequence-skip-data.log-expected.json +++ b/x-pack/filebeat/module/aws/vpcflow/test/tcp-flag-sequence-skip-data.log-expected.json @@ -24,7 +24,8 @@ "log.offset": 183, "service.type": "aws", "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] }, { @@ -52,7 +53,8 @@ "log.offset": 526, "service.type": "aws", "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] } ] \ No newline at end of file diff --git a/x-pack/filebeat/module/aws/vpcflow/test/tcp-flag-sequence.log-expected.json b/x-pack/filebeat/module/aws/vpcflow/test/tcp-flag-sequence.log-expected.json index 24135b909e7e..46420161eb5f 100644 --- a/x-pack/filebeat/module/aws/vpcflow/test/tcp-flag-sequence.log-expected.json +++ b/x-pack/filebeat/module/aws/vpcflow/test/tcp-flag-sequence.log-expected.json @@ -61,7 +61,8 @@ "source.packets": 8, "source.port": 43416, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] }, { @@ -127,7 +128,8 @@ "source.packets": 17, "source.port": 43638, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] }, { @@ -194,7 +196,8 @@ "source.packets": 14, "source.port": 5001, "tags": [ - "forwarded" + "forwarded", + "preserve_original_event" ] } ] \ No newline at end of file