Skip to content

Commit

Permalink
[Filebeat] aws/vpcflow move processing to ingest node (#28168)
Browse files Browse the repository at this point in the history
* [Filebeat] aws/vpcflow move processing to ingest node

- move aws/vpcflow processing to ingest node
- sync with aws/vpcflow integration
- make event.original optional
- add preserve_original_event to default tags
- check for ecs.version in config or pipeline
  • Loading branch information
leehinman authored Sep 28, 2021
1 parent f852ac3 commit 6e69b05
Show file tree
Hide file tree
Showing 13 changed files with 195 additions and 184 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add write access to `url.value` from `request.transforms` in `httpjson` input. {pull}27937[27937]
- Add Base64 encoded HMAC and UUID template functions to `httpjson` input {pull}27873[27873]
- Release checkpoint module as GA. {pull}27814[27814]
- Move processing to ingest node for AWS vpcflow fileset. {pull}28168[28168]

*Heartbeat*

Expand Down
28 changes: 15 additions & 13 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,14 +741,18 @@ def assert_explicit_ecs_version_set(self, module, fileset):
Assert that the module explicitly sets the ECS version field.
"""
def get_config_paths(modules_path, module, fileset):
pathname = os.path.abspath(modules_path +
"/" +
module +
"/" +
fileset +
"/" +
"config/*.yml")
return glob.glob(pathname)
fileset_path = os.path.abspath(modules_path +
"/" +
module +
"/" +
fileset +
"/")
paths = []
for x in ["config/*.yml", "ingest/*.yml", "ingest/*.json"]:
pathname = os.path.join(fileset_path, x)
paths.extend(glob.glob(pathname))

return paths

def is_ecs_version_set(path):
# parsing the yml file would be better but go templates in
Expand All @@ -759,9 +763,7 @@ def is_ecs_version_set(path):
return True
return False

errors = []
for cfg_path in get_config_paths(self.modules_path, module, fileset):
if not is_ecs_version_set(cfg_path):
errors.append("{}".format(cfg_path))
if len(errors) > 0:
raise Exception("{}/{} ecs.version not explicitly set in:\n{}".format(module, fileset, '\n'.join(errors)))
if is_ecs_version_set(cfg_path):
return
raise Exception("{}/{} ecs.version not explicitly set in config or pipeline".format(module, fileset))
123 changes: 0 additions & 123 deletions x-pack/filebeat/module/aws/vpcflow/config/input.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,126 +76,3 @@ exclude_files: [".gz$"]
{{ end }}
tags: {{.tags | tojson}}
publisher_pipeline.disable_host: {{ inList .tags "forwarded" }}

processors:
- drop_event:
when.regexp.message: "^version"
- drop_event:
when.regexp.message: "^instance-id"

- script:
lang: javascript
source: >
function process(event) {
var message = event.Get("message");
var tokens = message.split(" ").length;
event.Put("@metadata.message_token_count", tokens);
}
# Default vpc flow log format
- dissect:
when:
equals:
'@metadata.message_token_count': 14
field: message
target_prefix: aws.vpcflow
tokenizer: '%{version} %{account_id} %{interface_id} %{srcaddr} %{dstaddr} %{srcport} %{dstport} %{protocol} %{packets} %{bytes} %{start} %{end} %{action} %{log_status}'

# Custom flow log for traffic through a NAT gateway
- dissect:
when:
equals:
'@metadata.message_token_count': 6
field: message
target_prefix: aws.vpcflow
tokenizer: '%{instance_id} %{interface_id} %{srcaddr} %{dstaddr} %{pkt_srcaddr} %{pkt_dstaddr}'

# Custom flow log for traffic through a transit gateway
- dissect:
when:
equals:
'@metadata.message_token_count': 17
field: message
target_prefix: aws.vpcflow
tokenizer: '%{version} %{interface_id} %{account_id} %{vpc_id} %{subnet_id} %{instance_id} %{srcaddr} %{dstaddr} %{srcport} %{dstport} %{protocol} %{tcp_flags} %{type} %{pkt_srcaddr} %{pkt_dstaddr} %{action} %{log_status}'

# TCP Flag Sequence
- dissect:
when:
equals:
'@metadata.message_token_count': 21
field: message
target_prefix: aws.vpcflow
tokenizer: '%{version} %{vpc_id} %{subnet_id} %{instance_id} %{interface_id} %{account_id} %{type} %{srcaddr} %{dstaddr} %{srcport} %{dstport} %{pkt_srcaddr} %{pkt_dstaddr} %{protocol} %{bytes} %{packets} %{start} %{end} %{action} %{tcp_flags} %{log_status}'

- convert:
ignore_missing: true
fields:
- {from: aws.vpcflow.srcaddr, to: source.address}
- {from: aws.vpcflow.srcaddr, to: source.ip, type: ip}
- {from: aws.vpcflow.srcport, to: source.port, type: long}
- {from: aws.vpcflow.dstaddr, to: destination.address}
- {from: aws.vpcflow.dstaddr, to: destination.ip, type: ip}
- {from: aws.vpcflow.dstport, to: destination.port, type: long}
- {from: aws.vpcflow.protocol, to: network.iana_number, type: string}
- {from: aws.vpcflow.packets, to: source.packets, type: long}
- {from: aws.vpcflow.bytes, to: source.bytes, type: long}
- {from: aws.vpcflow.packets, to: network.packets, type: long}
- {from: aws.vpcflow.bytes, to: network.bytes, type: long}

- drop_fields:
fields: ["aws.vpcflow.srcaddr", "aws.vpcflow.srcport", "aws.vpcflow.dstaddr", "aws.vpcflow.dstport", "aws.vpcflow.bytes", "aws.vpcflow.packets", "aws.vpcflow.protocol"]

- community_id: ~

# Use the aws.vpcflow.action value to set the event.outcome value to either "allow" or "deny".
- add_fields:
when.equals.aws.vpcflow.action: ACCEPT
target: event
fields: {outcome: allow}
- add_fields:
when.equals.aws.vpcflow.action: REJECT
target: event
fields: {outcome: deny}

- add_fields:
target: event
fields: {type: flow}
- add_fields:
target: event
fields: {category: network_traffic}

# Add network.type: ipv4 or ipv6
- if:
contains.source.ip: "."
then:
- add_fields:
target: network
fields: {type: ipv4}

- if:
contains.source.ip: ":"
then:
- add_fields:
target: network
fields: {type: ipv6}

# Add network.transport: based on IANA protocol number of the traffic
# http://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
- if:
equals.network.iana_number: "6"
then:
- add_fields:
target: network
fields: {transport: tcp}
- if:
equals.network.iana_number: "17"
then:
- add_fields:
target: network
fields: {transport: udp}

- add_fields:
target: ''
fields:
ecs.version: 1.11.0
Loading

0 comments on commit 6e69b05

Please sign in to comment.