Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new source): New fluent source #7548

Merged
merged 18 commits into from
Jun 7, 2021
Merged
19 changes: 19 additions & 0 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,25 @@ jobs:
- run: make slim-builds
- run: make test-integration-elasticsearch

test-integration-fluent:
name: Integration - Linux, Fluent
runs-on: ubuntu-20.04
steps:
- uses: actions/[email protected]
- run: make ci-sweep
- uses: actions/[email protected]
with:
path: |
~/.cargo/registry
~/.cargo/git
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}
- run: sudo bash scripts/environment/bootstrap-ubuntu-20.04.sh
- run: bash scripts/environment/prepare.sh
- run: echo "::add-matcher::.github/matchers/rust.json"
- run: make slim-builds
- run: make test-integration-fluent


test-integration-gcp:
name: Integration - Linux, GCP
runs-on: ubuntu-20.04
Expand Down
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,14 @@ tower-layer = { version = "0.3.1", default-features = false }

# Serde
serde = { version = "1.0.126", default-features = false, features = ["derive"] }
serde_bytes = { version = "0.11.5", default-features = false, features = ["std"], optional = true }
serde_json = { version = "1.0.64", default-features = false, features = ["raw_value"] }
serde_yaml = { version = "0.8.17", default-features = false }
rmp-serde = { version = "0.15.4", default-features = false, optional = true }

# Messagepack

rmpv = { version = "0.4.7", default-features = false, features = ["with-serde"], optional = true }

# Prost
prost = { version = "0.7.0", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -350,6 +356,8 @@ wasm = ["lucet-runtime", "lucet-wasi", "lucetc", "vector-wasm"]
# transforms and sinks should depend on this feature.
kubernetes = ["k8s-openapi", "evmap"]

docker = ["bollard", "dirs-next"]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New feature to extract some of the bits from the docker_logs source to share with the fluent integration tests.


# API
api = [
"async-graphql",
Expand Down Expand Up @@ -380,6 +388,7 @@ sources-logs = [
"sources-docker_logs",
"sources-exec",
"sources-file",
"sources-fluent",
"sources-generator",
"sources-heroku_logs",
"sources-http",
Expand Down Expand Up @@ -412,9 +421,10 @@ sources-aws_kinesis_firehose = ["base64", "infer", "sources-utils-tls", "warp"]
sources-aws_s3 = ["rusoto", "rusoto_s3", "rusoto_sqs", "semver", "uuid"]
sources-datadog = ["sources-utils-http"]
sources-dnstap = ["bytesize", "base64", "data-encoding", "trust-dns-proto", "dnsmsg-parser", "tonic-build", "prost-build"]
sources-docker_logs = ["bollard", "dirs-next"]
sources-docker_logs = ["docker"]
sources-exec = []
sources-file = ["bytesize", "file-source"]
sources-fluent = ["base64", "bytesize", "listenfd", "tokio-util/net", "rmpv", "rmp-serde", "sources-utils-tcp-keepalive", "sources-utils-tcp-socket", "sources-utils-tls", "serde_bytes"]
sources-generator = ["fakedata"]
sources-heroku_logs = ["sources-utils-http"]
sources-host_metrics = ["heim"]
Expand Down Expand Up @@ -603,6 +613,7 @@ all-integration-tests = [
"clickhouse-integration-tests",
"docker-logs-integration-tests",
"es-integration-tests",
"fluent-integration-tests",
"gcp-cloud-storage-integration-tests",
"gcp-integration-tests",
"gcp-pubsub-integration-tests",
Expand Down Expand Up @@ -642,6 +653,7 @@ aws-sqs-integration-tests = ["sinks-aws_sqs"]
clickhouse-integration-tests = ["sinks-clickhouse", "warp"]
docker-logs-integration-tests = ["sources-docker_logs", "unix"]
es-integration-tests = ["sinks-elasticsearch"]
fluent-integration-tests = ["docker", "sources-fluent", "uuid"]
gcp-cloud-storage-integration-tests = ["sinks-gcp"]
gcp-integration-tests = ["sinks-gcp"]
gcp-pubsub-integration-tests = ["sinks-gcp"]
Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ test-behavior: ## Runs behaviorial test
.PHONY: test-integration
test-integration: ## Runs all integration tests
test-integration: test-integration-aws test-integration-clickhouse test-integration-docker-logs test-integration-elasticsearch
test-integration: test-integration-gcp test-integration-humio test-integration-influxdb test-integration-kafka
test-integration: test-integration-fluent test-integration-gcp test-integration-humio test-integration-influxdb test-integration-kafka
test-integration: test-integration-loki test-integration-mongodb_metrics test-integration-nats
test-integration: test-integration-nginx test-integration-postgresql_metrics test-integration-prometheus test-integration-pulsar
test-integration: test-integration-splunk test-integration-dnstap
Expand Down Expand Up @@ -346,6 +346,10 @@ ifeq ($(AUTODESPAWN), true)
@scripts/setup_integration_env.sh elasticsearch stop
endif

.PHONY: test-integration-fluent
test-integration-fluent: ## Runs Fluent integration tests
${MAYBE_ENVIRONMENT_EXEC} cargo test --no-fail-fast --no-default-features --features fluent-integration-tests --lib ::fluent:: -- --nocapture

.PHONY: test-integration-gcp
test-integration-gcp: ## Runs GCP integration tests
ifeq ($(AUTOSPAWN), true)
Expand Down
223 changes: 223 additions & 0 deletions docs/reference/components/sources/fluent.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
package metadata

components: sources: fluent: {
_port: 24224

title: "Fluent"

classes: {
commonly_used: true
delivery: "best_effort"
deployment_roles: ["sidecar", "aggregator"]
development: "beta"
egress_method: "stream"
stateful: false
}

features: {
receive: {
from: {
service: services.fluent

interface: socket: {
api: {
title: "Fluent"
url: urls.fluent
}
direction: "incoming"
port: _port
protocols: ["tcp"]
ssl: "optional"
}
}
receive_buffer_bytes: {
enabled: true
}
keepalive: enabled: true
tls: sources.socket.features.receive.tls
}
multiline: enabled: false
}

support: {
targets: {
"aarch64-unknown-linux-gnu": true
"aarch64-unknown-linux-musl": true
"armv7-unknown-linux-gnueabihf": true
"armv7-unknown-linux-musleabihf": true
"x86_64-apple-darwin": true
"x86_64-pc-windows-msv": true
"x86_64-unknown-linux-gnu": true
"x86_64-unknown-linux-musl": true
}
requirements: []
warnings: []
notices: []
}

installation: {
platform_name: null
}

configuration: {
address: {
description: "The address to listen for TCP connections on."
required: true
warnings: []
type: string: {
examples: ["0.0.0.0:\(_port)"]
syntax: "literal"
}
}
}

output: logs: line: {
description: "A Fluent message"
fields: {
host: {
description: "The IP address the fluent message was sent from."
required: true
type: string: {
examples: ["127.0.0.1"]
syntax: "literal"
}
}
timestamp: {
description: "The timestamp extracted from the fluent message."
required: true
type: timestamp: {}
}
tag: {
description: "The tag from the fluent message."
required: true
type: string: {
examples: ["dummy.0"]
syntax: "literal"
}
}
"*": {
description: "In addition to the defined fields, all fields from the fluent message are inserted as root level fields."
required: true
type: string: {
examples: ["hello world"]
syntax: "literal"
}
}
}
}

examples: [
{
title: "Dummy message from fluentd"
configuration: {}
input: """
```text
2021-05-20 16:23:03.021497000 -0400 dummy: {"message":"dummy"}
```

(this is the fluentd stdout encoding of the dummy message)
"""
output: log: {
host: _values.remote_host
timestamp: "2021-05-20T20:23:03.021497Z"
tag: "dummy"
message: "dummy"
}
},
{
title: "Dummy message from fluent-bit"
configuration: {}
input: """
```text
dummy.0: [1621541848.161827000, {"message"=>"dummy"}]
```

(this is the fluent-bit stdout encoding of the dummy message)
"""
output: log: {
host: _values.remote_host
timestamp: "2020-05-20T20:17:28.161827Z"
tag: "dummy.0"
message: "dummy"
}
},
]

how_it_works: {
aggregator: {
title: "Sending data from fluent agents to Vector aggregators"
body: """
If you are already running fluent agents (Fluentd or Fluent Bit) in your infrastructure, this source can
make it easy to start getting that data into Vector.
"""
}

fluentd_configuartion: {
title: "Fluentd configuration"
body: """
To configure Fluentd to forward to a Vector instance, you can use the following output configuration:

```text
<match *>
@type forward
<server>
# update these to point to your vector instance
name local
host 127.0.0.1
port 24224
</server>
compress gzip
</match>
```
"""
}

fluentbit_configuration: {
title: "Fluent Bit configuration"
body: """
To configure Fluent Bit to forward to a Vector instance, you can use the following output configuration:

```text
[OUTPUT]
Name forward
Match *
# update these to point to your vector instance
Host 127.0.0.1
Port 24224
```
"""
}

secure_mode: {
title: "Secure forward mode support"
body: """
The `fluent` source currently supports using TLS, but does not support the authentication part of the
Fluent protocol including:

- Shared key
- Username and password

And so these options of the secure forward output plugins for Fluent and Fluent Bit cannot be used.

If you would find this useful, [please let us know](\(urls.vector_repo)/issues/7532).
"""
}

acking: {
title: "Acknowledgement support"
body: """
The `fluent` source currently does not support the acknowledgement parts of the Fluent protocol and so
the `require_ack_response` option forward output plugins for Fluent and Fluent Bit cannot be used.

If you would find this useful, [please let us know](\(urls.vector_repo)/issues/7533).
"""
}
}

telemetry: metrics: {
events_in_total: components.sources.internal_metrics.output.metrics.events_in_total
decode_errors_total: components.sources.internal_metrics.output.metrics.decode_errors_total
processed_bytes_total: components.sources.internal_metrics.output.metrics.processed_bytes_total
processed_events_total: components.sources.internal_metrics.output.metrics.processed_events_total
}
}
6 changes: 6 additions & 0 deletions docs/reference/components/sources/internal_metrics.cue
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,12 @@ components: sources: internal_metrics: {
default_namespace: "vector"
tags: _component_tags
}
decode_errors_total: {
description: "The total number of decode errors seen when decoding data in a source component."
type: "counter"
default_namespace: "vector"
tags: _component_tags
}
k8s_format_picker_edge_cases_total: {
description: "The total number of edge cases encountered while picking format of the Kubernetes log message."
type: "counter"
Expand Down
Loading