Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Jun 14, 2023
1 parent 5201794 commit 258a183
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 9 deletions.
10 changes: 10 additions & 0 deletions component/common/loki/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package loki

import (
"context"
"github.com/grafana/agent/pkg/river/token"
"github.com/grafana/agent/pkg/river/token/builder"
"sync"
"time"

Expand All @@ -27,6 +29,14 @@ const finalEntryTimeout = 5 * time.Second
// communication.
type LogsReceiver chan Entry

// TODO(piotr): Since LogsReceiver is not an interface, we can't extend it...
func (l LogsReceiver) RiverTokenize() []builder.Token {
return []builder.Token{{
Tok: token.STRING,
Lit: "loki.write.FIX_ME.receiver",
}}
}

// Entry is a log entry with labels.
type Entry struct {
Labels model.LabelSet
Expand Down
2 changes: 1 addition & 1 deletion converter/internal/common/convert_appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type ConvertAppendable struct {

var _ storage.Appendable = (*ConvertAppendable)(nil)
var _ builder.Tokenizer = ConvertAppendable{}
var _ river.Capsule = ConvertTargets{}
var _ river.Capsule = ConvertAppendable{}

func (f ConvertAppendable) RiverCapsule() {}
func (f ConvertAppendable) RiverTokenize() []builder.Token {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
)

func TestConvert(t *testing.T) {
filepath.WalkDir("testdata", func(path string, d fs.DirEntry, _ error) error {
require.NoError(t, filepath.WalkDir("testdata", func(path string, d fs.DirEntry, _ error) error {
if d.IsDir() {
return nil
}
Expand Down Expand Up @@ -57,7 +57,7 @@ func TestConvert(t *testing.T) {
}

return nil
})
}))
}

// Replace '\r\n' with '\n'
Expand All @@ -71,8 +71,8 @@ func parseErrors(t *testing.T, errors []byte) diag.Diagnostics {

errorsString := string(normalizeLineEndings(errors))
splitErrors := strings.Split(errorsString, "\n")
for _, error := range splitErrors {
parsedError := strings.Split(error, " | ")
for _, err := range splitErrors {
parsedError := strings.Split(err, " | ")
if len(parsedError) != 2 {
require.FailNow(t, "invalid error format")
}
Expand Down
87 changes: 87 additions & 0 deletions converter/internal/promtailconvert/promtailconvert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package promtailconvert

import (
"bytes"
"fmt"
"github.com/grafana/agent/component/common/loki"
lokiwrite "github.com/grafana/agent/component/loki/write"
"github.com/grafana/agent/converter/internal/common"
"github.com/grafana/loki/clients/pkg/promtail/client"
lokiflag "github.com/grafana/loki/pkg/util/flagext"
"gopkg.in/yaml.v2"

"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/pkg/river/token/builder"
promtailcfg "github.com/grafana/loki/clients/pkg/promtail/config"
)

// Convert implements a Promtail config converter.
func Convert(in []byte) ([]byte, diag.Diagnostics) {
var diags diag.Diagnostics

var cfg promtailcfg.Config
// TODO: this doesn't handle the defaults correctly. We'd need to import other Loki's packages to do that.
if err := yaml.UnmarshalStrict(in, &cfg); err != nil {
diags.Add(diag.SeverityLevelError, fmt.Sprintf("failed to parse Promtail config: %s", err))
return nil, diags
}

f := builder.NewFile()
diags = AppendAll(f, &cfg)

var buf bytes.Buffer
if _, err := f.WriteTo(&buf); err != nil {
diags.Add(diag.SeverityLevelError, fmt.Sprintf("failed to render Flow config: %s", err.Error()))
return nil, diags
}
return buf.Bytes(), diags
}

// AppendAll analyzes the entire promtail config in memory and transforms it
// into Flow components. It then appends each argument to the file builder.
func AppendAll(f *builder.File, cfg *promtailcfg.Config) diag.Diagnostics {
var diags diag.Diagnostics

for i, c := range cfg.ClientConfigs {
appendLokiWrite(f, &c, i)
}

return diags
}

//func appendRemoteWrite(f *builder.File, promConfig *promconfig.Config) *remotewrite.Exports {
// remoteWriteArgs := toRemotewriteArguments(promConfig)
// common.AppendBlockWithOverride(f, []string{"prometheus", "remote_write"}, "default", remoteWriteArgs)
//
// return &remotewrite.Exports{
// Receiver: common.ConvertAppendable{Expr: "prometheus.remote_write.default.receiver"},
// }
//}

func appendLokiWrite(f *builder.File, client *client.Config, index int) *lokiwrite.Exports {
name := fmt.Sprintf("default_%d", index)
lokiWriteArgs := toLokiWriteArguments(client)
common.AppendBlockWithOverride(f, []string{"loki", "write"}, name, lokiWriteArgs)
return &lokiwrite.Exports{
Receiver: make(loki.LogsReceiver),
}
}

func toLokiWriteArguments(config *client.Config) *lokiwrite.Arguments {
return &lokiwrite.Arguments{
Endpoints: []lokiwrite.EndpointOptions{
{
URL: config.URL.String(),
},
},
ExternalLabels: convertLabels(config.ExternalLabels),
}
}

func convertLabels(labels lokiflag.LabelSet) map[string]string {
result := map[string]string{}
for k, v := range labels.LabelSet {
result[string(k)] = string(v)
}
return result
}
90 changes: 90 additions & 0 deletions converter/internal/promtailconvert/promtailconvert_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package promtailconvert_test

import (
"bytes"
"io/fs"
"os"
"path/filepath"
"strconv"
"strings"
"testing"

"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/promtailconvert"
"github.com/stretchr/testify/require"
)

const (
promSuffix = ".yaml"
flowSuffix = ".river"
errorsSuffix = ".errors"
)

func TestConvert(t *testing.T) {
require.NoError(t, filepath.WalkDir("testdata", func(path string, d fs.DirEntry, _ error) error {
if d.IsDir() {
return nil
}

if strings.HasSuffix(path, promSuffix) {
inputFile := path
inputBytes, err := os.ReadFile(inputFile)
require.NoError(t, err)

caseName := filepath.Base(path)
caseName = strings.TrimSuffix(caseName, promSuffix)

t.Run(caseName, func(t *testing.T) {
actual, diags := promtailconvert.Convert(inputBytes)

expectedDiags := diag.Diagnostics(nil)
errorFile := strings.TrimSuffix(path, promSuffix) + errorsSuffix
if _, err := os.Stat(errorFile); err == nil {
errorBytes, err := os.ReadFile(errorFile)
require.NoError(t, err)
expectedDiags = parseErrors(t, errorBytes)
}

require.Equal(t, expectedDiags, diags)

outputFile := strings.TrimSuffix(path, promSuffix) + flowSuffix
if _, err := os.Stat(outputFile); err == nil {
outputBytes, err := os.ReadFile(outputFile)
require.NoError(t, err)
require.Equal(t, string(normalizeLineEndings(outputBytes)), string(normalizeLineEndings(actual)))
}
})
}

return nil
}))
}

// Replace '\r\n' with '\n'
func normalizeLineEndings(data []byte) []byte {
normalized := bytes.ReplaceAll(data, []byte{'\r', '\n'}, []byte{'\n'})
return normalized
}

func parseErrors(t *testing.T, errors []byte) diag.Diagnostics {
var diags diag.Diagnostics

errorsString := string(normalizeLineEndings(errors))
splitErrors := strings.Split(errorsString, "\n")
for _, err := range splitErrors {
parsedError := strings.Split(err, " | ")
if len(parsedError) != 2 {
require.FailNow(t, "invalid error format")
}

severity, err := strconv.ParseInt(parsedError[0], 10, 8)
require.NoError(t, err)

// Some error messages have \n in them and need this
errorMessage := strings.ReplaceAll(parsedError[1], "\\n", "\n")

diags.Add(diag.Severity(severity), errorMessage)
}

return diags
}
1 change: 1 addition & 0 deletions converter/internal/promtailconvert/testdata/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
cloud_*
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
2 | failed to parse Promtail config: yaml: unmarshal errors:\n line 11: field not_a_thing not found in type config.Config
13 changes: 13 additions & 0 deletions converter/internal/promtailconvert/testdata/bad_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
clients:
- external_labels:
cluster: backyard-pine-treehouse-1
url: http://localhost/loki/api/v1/push
scrape_configs:
- job_name: kubernetes-pods
kubernetes_sd_configs:
- role: pod
pipeline_stages:
- cri: {}
not_a_thing: true
server:
profiling_enabled: true
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ require (
github.com/PuerkitoBio/rehttp v1.1.0
github.com/Shopify/sarama v1.38.1
github.com/alecthomas/kingpin/v2 v2.3.2
github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052
github.com/spaolacci/murmur3 v1.1.0
github.com/zeebo/xxh3 v1.0.2
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/aws/aws-sdk-go v1.44.187
github.com/aws/aws-sdk-go-v2 v1.17.2
Expand Down Expand Up @@ -207,7 +204,6 @@ require (
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/yaml v1.3.0
github.com/google/pprof v0.0.0-20230111200839-76d1ae5aea2b

)

Expand Down

0 comments on commit 258a183

Please sign in to comment.