Skip to content

Commit

Permalink
feat(lambda/promtail): support dropping labels (grafana#10755)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

**Which issue(s) this PR fixes**:
Fixes grafana#10669 

**Special notes for your reviewer**:

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)

---------

Signed-off-by: hainenber <[email protected]>
Co-authored-by: Michel Hollands <[email protected]>
  • Loading branch information
2 people authored and rhnasc committed Apr 12, 2024
1 parent e8e6519 commit c313ee8
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

* [10416](https://github.com/grafana/loki/pull/10416) **lpugoy**: Lambda-Promtail: Add support for WAF logs in S3
* [10301](https://github.com/grafana/loki/pull/10301) **wildum**: users can now define `additional_fields` in cloudflare configuration.
* [10755](https://github.com/grafana/loki/pull/10755) **hainenber**: Lambda-Promtail: Add support for dropping labels passed via env var

##### Changes

Expand Down
4 changes: 2 additions & 2 deletions tools/lambda-promtail/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ Then use Terraform to deploy:

```bash
## use cloudwatch log group
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'bearer_token=<bearer-token>' -var 'log_group_names=["log-group-01", "log-group-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'bearer_token=<bearer-token>' -var 'log_group_names=["log-group-01", "log-group-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var 'drop_labels="name1,name2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
```

```bash
## use kinesis data stream
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'kinesis_stream_name=["kinesis-stream-01", "kinesis-stream-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
terraform apply -var "<ecr-repo>:<tag>" -var "write_address=https://your-loki-url/loki/api/v1/push" -var "password=<basic-auth-pw>" -var "username=<basic-auth-username>" -var 'kinesis_stream_name=["kinesis-stream-01", "kinesis-stream-02"]' -var 'extra_labels="name1,value1,name2,value2"' -var 'drop_labels="name1,name2"' -var "tenant_id=<value>" -var 'skip_tls_verify="false"'
```

or CloudFormation:
Expand Down
2 changes: 1 addition & 1 deletion tools/lambda-promtail/lambda-promtail/cw.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent)
labels[model.LabelName("__aws_cloudwatch_log_stream")] = model.LabelValue(data.LogStream)
}

labels = applyExtraLabels(labels)
labels = applyLabels(labels)

for _, event := range data.LogEvents {
timestamp := time.UnixMilli(event.Timestamp)
Expand Down
2 changes: 1 addition & 1 deletion tools/lambda-promtail/lambda-promtail/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent)
model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn),
}

labels = applyExtraLabels(labels)
labels = applyLabels(labels)

// Check if the data is gzipped by inspecting the 'data' field
if isGzipped(record.Kinesis.Data) {
Expand Down
50 changes: 39 additions & 11 deletions tools/lambda-promtail/lambda-promtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,19 @@ const (

maxErrMsgLen = 1024

invalidExtraLabelsError = "Invalid value for environment variable EXTRA_LABELS. Expected a comma separated list with an even number of entries. "
invalidExtraLabelsError = "invalid value for environment variable EXTRA_LABELS. Expected a comma separated list with an even number of entries. "
)

var (
writeAddress *url.URL
username, password, extraLabelsRaw, tenantID, bearerToken string
keepStream bool
batchSize int
s3Clients map[string]*s3.Client
extraLabels model.LabelSet
skipTlsVerify bool
printLogLine bool
writeAddress *url.URL
username, password, extraLabelsRaw, dropLabelsRaw, tenantID, bearerToken string
keepStream bool
batchSize int
s3Clients map[string]*s3.Client
extraLabels model.LabelSet
dropLabels []model.LabelName
skipTlsVerify bool
printLogLine bool
)

func setupArguments() {
Expand All @@ -60,6 +61,11 @@ func setupArguments() {
panic(err)
}

dropLabels, err = getDropLabels()
if err != nil {
panic(err)
}

username = os.Getenv("USERNAME")
password = os.Getenv("PASSWORD")
// If either username or password is set then both must be.
Expand Down Expand Up @@ -128,8 +134,30 @@ func parseExtraLabels(extraLabelsRaw string, omitPrefix bool) (model.LabelSet, e
return extractedLabels, nil
}

func applyExtraLabels(labels model.LabelSet) model.LabelSet {
return labels.Merge(extraLabels)
func getDropLabels() ([]model.LabelName, error) {
var result []model.LabelName

dropLabelsRaw = os.Getenv("DROP_LABELS")
dropLabelsRawSplit := strings.Split(dropLabelsRaw, ",")
for _, dropLabelRaw := range dropLabelsRawSplit {
dropLabel := model.LabelName(dropLabelRaw)
if !dropLabel.IsValid() {
return []model.LabelName{}, fmt.Errorf("invalid label name %s", dropLabelRaw)
}
result = append(result, dropLabel)
}

return result, nil
}

func applyLabels(labels model.LabelSet) model.LabelSet {
finalLabels := labels.Merge(extraLabels)

for _, dropLabel := range dropLabels {
delete(finalLabels, dropLabel)
}

return finalLabels
}

func checkEventType(ev map[string]interface{}) (interface{}, error) {
Expand Down
25 changes: 25 additions & 0 deletions tools/lambda-promtail/lambda-promtail/main_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"os"
"testing"

"github.com/prometheus/common/model"
Expand Down Expand Up @@ -34,3 +35,27 @@ func TestLambdaPromtail_TestParseLabelsNoneProvided(t *testing.T) {
require.Len(t, extraLabels, 0)
require.Nil(t, err)
}

func TestLambdaPromtail_TestDropLabels(t *testing.T) {
os.Setenv("DROP_LABELS", "A1,A2")

// Reset the shared global variables
defer func() {
os.Unsetenv("DROP_LABELS")
dropLabels = []model.LabelName{}
}()

var err error
dropLabels, err = getDropLabels()
require.Nil(t, err)
require.Contains(t, dropLabels, model.LabelName("A1"))

defaultLabelSet := model.LabelSet{
model.LabelName("default"): model.LabelValue("default"),
model.LabelName("A1"): model.LabelValue("A1"),
model.LabelName("B2"): model.LabelValue("B2"),
}
modifiedLabels := applyLabels(defaultLabelSet)
require.NotContains(t, modifiedLabels, model.LabelName("A1"))
require.Contains(t, modifiedLabels, model.LabelName("B2"))
}
8 changes: 5 additions & 3 deletions tools/lambda-promtail/lambda-promtail/promtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ func (b *batch) createPushRequest() (*logproto.PushRequest, int) {
}

func (b *batch) flushBatch(ctx context.Context) error {
err := b.client.sendToPromtail(ctx, b)
if err != nil {
return err
if b.client != nil {
err := b.client.sendToPromtail(ctx, b)
if err != nil {
return err
}
}
b.resetBatch()

Expand Down
17 changes: 9 additions & 8 deletions tools/lambda-promtail/lambda-promtail/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
model.LabelName(fmt.Sprintf("__aws_%s_owner", parser.logTypeLabel)): model.LabelValue(labels[parser.ownerLabelKey]),
}

ls = applyExtraLabels(ls)
ls = applyLabels(ls)

// extract the timestamp of the nested event and sends the rest as raw json
if labels["type"] == CLOUDTRAIL_LOG_TYPE {
Expand Down Expand Up @@ -341,13 +341,14 @@ func stringToRawEvent(body string) (map[string]interface{}, error) {
// It also makes use of the fact that the log10 of a number in base 10 is its number of digits - 1.
// It returns early if the fractional seconds is 0 because getting the log10 of 0 results in -Inf.
// For example, given a string 1234567890123:
// iLog10 = 12 // the parsed int is 13 digits long
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
// fractionalSec = 123 // the rest of the parsed int
// fractionalSecLog10 = 2 // it is 3 digits long
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
// nsec = 123000000 // this is the nanoseconds part of the Unix time
//
// iLog10 = 12 // the parsed int is 13 digits long
// multiplier = 0.001 // to get the seconds part it must be divided by 1000
// sec = 1234567890123 * 0.001 = 1234567890 // this is the seconds part of the Unix time
// fractionalSec = 123 // the rest of the parsed int
// fractionalSecLog10 = 2 // it is 3 digits long
// multiplier = 1000000 // nano is 10^-9, so the nanoseconds part is 9 digits long
// nsec = 123000000 // this is the nanoseconds part of the Unix time
func getUnixSecNsec(s string) (sec int64, nsec int64, err error) {
const (
UNIX_SEC_LOG10 = 9
Expand Down
1 change: 1 addition & 0 deletions tools/lambda-promtail/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ resource "aws_lambda_function" "this" {
KEEP_STREAM = var.keep_stream
BATCH_SIZE = var.batch_size
EXTRA_LABELS = var.extra_labels
DROP_LABELS = var.drop_labels
OMIT_EXTRA_LABELS_PREFIX = var.omit_extra_labels_prefix ? "true" : "false"
TENANT_ID = var.tenant_id
SKIP_TLS_VERIFY = var.skip_tls_verify
Expand Down
6 changes: 6 additions & 0 deletions tools/lambda-promtail/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ variable "extra_labels" {
default = ""
}

variable "drop_labels" {
type = string
description = "Comma separated list of labels to be drop, in the format 'name1,name2,...,nameN' to be omitted to entries forwarded by lambda-promtail."
default = ""
}

variable "omit_extra_labels_prefix" {
type = bool
description = "Whether or not to omit the prefix `__extra_` from extra labels defined in the variable `extra_labels`."
Expand Down

0 comments on commit c313ee8

Please sign in to comment.