Skip to content

Commit

Permalink
promtail confing conversion (#4160)
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr authored and clayton-cornell committed Aug 14, 2023
1 parent 9658607 commit eab9427
Show file tree
Hide file tree
Showing 45 changed files with 1,394 additions and 25 deletions.
5 changes: 5 additions & 0 deletions converter/diag/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (ds *Diagnostics) AddWithDetail(severity Severity, message string, detail s
})
}

// AddAll adds all given diagnostics to the diagnostics list.
func (ds *Diagnostics) AddAll(diags Diagnostics) {
*ds = append(*ds, diags...)
}

// Error implements error.
func (ds Diagnostics) Error() string {
var sb strings.Builder
Expand Down
8 changes: 7 additions & 1 deletion converter/internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ import (
// NewBlockWithOverride generates a new [*builder.Block] using a hook to
// override specific types.
func NewBlockWithOverride(name []string, label string, args component.Arguments) *builder.Block {
return NewBlockWithOverrideFn(name, label, args, getValueOverrideHook())
}

// NewBlockWithOverrideFn generates a new [*builder.Block] using a hook fn to
// override specific types.
func NewBlockWithOverrideFn(name []string, label string, args component.Arguments, fn builder.ValueOverrideHook) *builder.Block {
block := builder.NewBlock(name, label)
block.Body().SetValueOverrideHook(getValueOverrideHook())
block.Body().SetValueOverrideHook(fn)
block.Body().AppendFrom(args)
return block
}
Expand Down
2 changes: 1 addition & 1 deletion converter/internal/common/convert_appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type ConvertAppendable struct {

var _ storage.Appendable = (*ConvertAppendable)(nil)
var _ builder.Tokenizer = ConvertAppendable{}
var _ river.Capsule = ConvertTargets{}
var _ river.Capsule = ConvertAppendable{}

func (f ConvertAppendable) RiverCapsule() {}
func (f ConvertAppendable) RiverTokenize() []builder.Token {
Expand Down
28 changes: 28 additions & 0 deletions converter/internal/common/convert_logs_receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package common

import (
"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/pkg/river"
"github.com/grafana/agent/pkg/river/token"
"github.com/grafana/agent/pkg/river/token/builder"
)

// ConvertLogsReceiver allows us to override how the loki.LogsReceiver is tokenized.
// See ConvertAppendable as another example with more details in comments.
type ConvertLogsReceiver struct {
loki.LogsReceiver

Expr string
}

var _ loki.LogsReceiver = (*ConvertLogsReceiver)(nil)
var _ builder.Tokenizer = ConvertLogsReceiver{}
var _ river.Capsule = ConvertLogsReceiver{}

func (f ConvertLogsReceiver) RiverCapsule() {}
func (f ConvertLogsReceiver) RiverTokenize() []builder.Token {
return []builder.Token{{
Tok: token.STRING,
Lit: f.Expr,
}}
}
19 changes: 19 additions & 0 deletions converter/internal/common/custom_tokenizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package common

import (
"github.com/grafana/agent/pkg/river/token"
"github.com/grafana/agent/pkg/river/token/builder"
)

type CustomTokenizer struct {
Expr string
}

var _ builder.Tokenizer = CustomTokenizer{}

func (f CustomTokenizer) RiverTokenize() []builder.Token {
return []builder.Token{{
Tok: token.STRING,
Lit: f.Expr,
}}
}
2 changes: 1 addition & 1 deletion converter/internal/prometheusconvert/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func toDiscoveryAzure(sdConfig *prom_azure.SDConfig) *azure.Arguments {
}

func validateDiscoveryAzure(sdConfig *prom_azure.SDConfig) diag.Diagnostics {
return validateHttpClientConfig(&sdConfig.HTTPClientConfig)
return ValidateHttpClientConfig(&sdConfig.HTTPClientConfig)
}

func toManagedIdentity(sdConfig *prom_azure.SDConfig) *azure.ManagedIdentity {
Expand Down
6 changes: 3 additions & 3 deletions converter/internal/prometheusconvert/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
prom_config "github.com/prometheus/common/config"
)

func toHttpClientConfig(httpClientConfig *prom_config.HTTPClientConfig) *config.HTTPClientConfig {
func ToHttpClientConfig(httpClientConfig *prom_config.HTTPClientConfig) *config.HTTPClientConfig {
if httpClientConfig == nil {
return nil
}
Expand All @@ -26,9 +26,9 @@ func toHttpClientConfig(httpClientConfig *prom_config.HTTPClientConfig) *config.
}
}

// validateHttpClientConfig returns [diag.Diagnostics] for currently
// ValidateHttpClientConfig returns [diag.Diagnostics] for currently
// unsupported Flow features available in Prometheus.
func validateHttpClientConfig(httpClientConfig *prom_config.HTTPClientConfig) diag.Diagnostics {
func ValidateHttpClientConfig(httpClientConfig *prom_config.HTTPClientConfig) diag.Diagnostics {
var diags diag.Diagnostics

if httpClientConfig.NoProxy != "" {
Expand Down
4 changes: 2 additions & 2 deletions converter/internal/prometheusconvert/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func appendDiscoveryConsul(pb *prometheusBlocks, label string, sdConfig *prom_co
}

func validateDiscoveryConsul(sdConfig *prom_consul.SDConfig) diag.Diagnostics {
return validateHttpClientConfig(&sdConfig.HTTPClientConfig)
return ValidateHttpClientConfig(&sdConfig.HTTPClientConfig)
}

func toDiscoveryConsul(sdConfig *prom_consul.SDConfig) *consul.Arguments {
Expand All @@ -43,6 +43,6 @@ func toDiscoveryConsul(sdConfig *prom_consul.SDConfig) *consul.Arguments {
ServiceTags: sdConfig.ServiceTags,
NodeMeta: sdConfig.NodeMeta,
RefreshInterval: time.Duration(sdConfig.RefreshInterval),
HTTPClientConfig: *toHttpClientConfig(&sdConfig.HTTPClientConfig),
HTTPClientConfig: *ToHttpClientConfig(&sdConfig.HTTPClientConfig),
}
}
2 changes: 1 addition & 1 deletion converter/internal/prometheusconvert/digitalocean.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func validateDiscoveryDigitalOcean(sdConfig *prom_digitalocean.SDConfig) diag.Di
diags.Add(diag.SeverityLevelError, "unsupported oauth2 for digitalocean_sd_configs")
}

newDiags := validateHttpClientConfig(&sdConfig.HTTPClientConfig)
newDiags := ValidateHttpClientConfig(&sdConfig.HTTPClientConfig)

diags = append(diags, newDiags...)
return diags
Expand Down
4 changes: 2 additions & 2 deletions converter/internal/prometheusconvert/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func appendDiscoveryDocker(pb *prometheusBlocks, label string, sdConfig *prom_do
}

func validateDiscoveryDocker(sdConfig *prom_docker.DockerSDConfig) diag.Diagnostics {
return validateHttpClientConfig(&sdConfig.HTTPClientConfig)
return ValidateHttpClientConfig(&sdConfig.HTTPClientConfig)
}

func toDiscoveryDocker(sdConfig *prom_docker.DockerSDConfig) *docker.Arguments {
Expand All @@ -33,7 +33,7 @@ func toDiscoveryDocker(sdConfig *prom_docker.DockerSDConfig) *docker.Arguments {
HostNetworkingHost: sdConfig.HostNetworkingHost,
RefreshInterval: time.Duration(sdConfig.RefreshInterval),
Filters: toDockerFilters(sdConfig.Filters),
HTTPClientConfig: *toHttpClientConfig(&sdConfig.HTTPClientConfig),
HTTPClientConfig: *ToHttpClientConfig(&sdConfig.HTTPClientConfig),
}
}

Expand Down
8 changes: 4 additions & 4 deletions converter/internal/prometheusconvert/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import (
)

func appendDiscoveryKubernetes(pb *prometheusBlocks, label string, sdConfig *prom_kubernetes.SDConfig) discovery.Exports {
discoveryKubernetesArgs := toDiscoveryKubernetes(sdConfig)
discoveryKubernetesArgs := ToDiscoveryKubernetes(sdConfig)
name := []string{"discovery", "kubernetes"}
block := common.NewBlockWithOverride(name, label, discoveryKubernetesArgs)
pb.discoveryBlocks = append(pb.discoveryBlocks, newPrometheusBlock(block, name, label, "", ""))
return newDiscoverExports("discovery.kubernetes." + label + ".targets")
}

func validateDiscoveryKubernetes(sdConfig *prom_kubernetes.SDConfig) diag.Diagnostics {
return validateHttpClientConfig(&sdConfig.HTTPClientConfig)
return ValidateHttpClientConfig(&sdConfig.HTTPClientConfig)
}

func toDiscoveryKubernetes(sdConfig *prom_kubernetes.SDConfig) *kubernetes.Arguments {
func ToDiscoveryKubernetes(sdConfig *prom_kubernetes.SDConfig) *kubernetes.Arguments {
if sdConfig == nil {
return nil
}
Expand All @@ -30,7 +30,7 @@ func toDiscoveryKubernetes(sdConfig *prom_kubernetes.SDConfig) *kubernetes.Argum
APIServer: config.URL(sdConfig.APIServer),
Role: string(sdConfig.Role),
KubeConfig: sdConfig.KubeConfig,
HTTPClientConfig: *toHttpClientConfig(&sdConfig.HTTPClientConfig),
HTTPClientConfig: *ToHttpClientConfig(&sdConfig.HTTPClientConfig),
NamespaceDiscovery: toNamespaceDiscovery(&sdConfig.NamespaceDiscovery),
Selectors: toSelectorConfig(sdConfig.Selectors),
AttachMetadata: toAttachMetadata(&sdConfig.AttachMetadata),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
)

func TestConvert(t *testing.T) {
filepath.WalkDir("testdata", func(path string, d fs.DirEntry, _ error) error {
require.NoError(t, filepath.WalkDir("testdata", func(path string, d fs.DirEntry, _ error) error {
if d.IsDir() {
return nil
}
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestConvert(t *testing.T) {
}

return nil
})
}))
}

// Replace '\r\n' with '\n'
Expand Down
6 changes: 3 additions & 3 deletions converter/internal/prometheusconvert/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func toRelabelArguments(relabelConfigs []*prom_relabel.Config, forwardTo []stora

return &relabel.Arguments{
ForwardTo: forwardTo,
MetricRelabelConfigs: toRelabelConfigs(relabelConfigs),
MetricRelabelConfigs: ToFlowRelabelConfigs(relabelConfigs),
}
}

Expand All @@ -56,11 +56,11 @@ func appendDiscoveryRelabel(pb *prometheusBlocks, relabelConfigs []*prom_relabel
func toDiscoveryRelabelArguments(relabelConfigs []*prom_relabel.Config, targets []discovery.Target) *disc_relabel.Arguments {
return &disc_relabel.Arguments{
Targets: targets,
RelabelConfigs: toRelabelConfigs(relabelConfigs),
RelabelConfigs: ToFlowRelabelConfigs(relabelConfigs),
}
}

func toRelabelConfigs(relabelConfigs []*prom_relabel.Config) []*flow_relabel.Config {
func ToFlowRelabelConfigs(relabelConfigs []*prom_relabel.Config) []*flow_relabel.Config {
if len(relabelConfigs) == 0 {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions converter/internal/prometheusconvert/remote_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func validateRemoteWriteConfig(remoteWriteConfig *prom_config.RemoteWriteConfig)
diags.Add(diag.SeverityLevelError, "unsupported remote_write sigv4 config was provided")
}

newDiags := validateHttpClientConfig(&remoteWriteConfig.HTTPClientConfig)
newDiags := ValidateHttpClientConfig(&remoteWriteConfig.HTTPClientConfig)
diags = append(diags, newDiags...)

return diags
Expand Down Expand Up @@ -74,10 +74,10 @@ func getEndpointOptions(remoteWriteConfigs []*prom_config.RemoteWriteConfig) []*
Headers: remoteWriteConfig.Headers,
SendExemplars: remoteWriteConfig.SendExemplars,
SendNativeHistograms: remoteWriteConfig.SendNativeHistograms,
HTTPClientConfig: toHttpClientConfig(&remoteWriteConfig.HTTPClientConfig),
HTTPClientConfig: ToHttpClientConfig(&remoteWriteConfig.HTTPClientConfig),
QueueOptions: toQueueOptions(&remoteWriteConfig.QueueConfig),
MetadataOptions: toMetadataOptions(&remoteWriteConfig.MetadataConfig),
WriteRelabelConfigs: toRelabelConfigs(remoteWriteConfig.WriteRelabelConfigs),
WriteRelabelConfigs: ToFlowRelabelConfigs(remoteWriteConfig.WriteRelabelConfigs),
}

endpoints = append(endpoints, endpoint)
Expand Down
4 changes: 2 additions & 2 deletions converter/internal/prometheusconvert/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func appendPrometheusScrape(pb *prometheusBlocks, scrapeConfig *prom_config.Scra
}

func validatePrometheusScrape(scrapeConfig *prom_config.ScrapeConfig) diag.Diagnostics {
return validateHttpClientConfig(&scrapeConfig.HTTPClientConfig)
return ValidateHttpClientConfig(&scrapeConfig.HTTPClientConfig)
}

func toScrapeArguments(scrapeConfig *prom_config.ScrapeConfig, forwardTo []storage.Appendable, targets []discovery.Target) *scrape.Arguments {
Expand All @@ -49,7 +49,7 @@ func toScrapeArguments(scrapeConfig *prom_config.ScrapeConfig, forwardTo []stora
LabelLimit: scrapeConfig.LabelLimit,
LabelNameLengthLimit: scrapeConfig.LabelNameLengthLimit,
LabelValueLengthLimit: scrapeConfig.LabelValueLengthLimit,
HTTPClientConfig: *toHttpClientConfig(&scrapeConfig.HTTPClientConfig),
HTTPClientConfig: *ToHttpClientConfig(&scrapeConfig.HTTPClientConfig),
ExtraMetrics: false,
Clustering: scrape.Clustering{Enabled: false},
}
Expand Down
42 changes: 42 additions & 0 deletions converter/internal/promtailconvert/internal/build/cloudflare.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package build

import (
"fmt"
"time"

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/loki/source/cloudflare"
"github.com/grafana/agent/converter/internal/common"
"github.com/grafana/agent/pkg/river/rivertypes"
)

func (s *ScrapeConfigBuilder) AppendCloudFlareConfig() {
if s.cfg.CloudflareConfig == nil {
return
}

args := cloudflare.Arguments{
APIToken: rivertypes.Secret(s.cfg.CloudflareConfig.APIToken),
ZoneID: s.cfg.CloudflareConfig.ZoneID,
Labels: convertPromLabels(s.cfg.CloudflareConfig.Labels),
Workers: s.cfg.CloudflareConfig.Workers,
PullRange: time.Duration(s.cfg.CloudflareConfig.PullRange),
FieldsType: s.cfg.CloudflareConfig.FieldsType,
}
override := func(val interface{}) interface{} {
switch conv := val.(type) {
case []loki.LogsReceiver:
return common.CustomTokenizer{Expr: fmt.Sprintf("[%s]", s.getOrNewLokiRelabel())}
case rivertypes.Secret:
return string(conv)
default:
return val
}
}
s.f.Body().AppendBlock(common.NewBlockWithOverrideFn(
[]string{"loki", "source", "cloudfare"},
s.cfg.JobName,
args,
override,
))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package build

import (
"time"

"github.com/grafana/agent/component/common/loki"
)

type GlobalContext struct {
WriteReceivers []loki.LogsReceiver
TargetSyncPeriod time.Duration
}
47 changes: 47 additions & 0 deletions converter/internal/promtailconvert/internal/build/journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package build

import (
"fmt"
"time"

flowrelabel "github.com/grafana/agent/component/common/relabel"
"github.com/grafana/agent/component/loki/source/journal"
"github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/converter/internal/common"
)

func (s *ScrapeConfigBuilder) AppendJournalConfig() {
jc := s.cfg.JournalConfig
if jc == nil {
return
}
maxAge, err := time.ParseDuration(jc.MaxAge)
if err != nil {
s.diags.Add(
diag.SeverityLevelError,
fmt.Sprintf("failed to parse max_age duration for journal config: %s, will use default", err),
)
maxAge = time.Hour * 7 // use default value
}
args := journal.Arguments{
FormatAsJson: jc.JSON,
MaxAge: maxAge,
Path: jc.Path,
Receivers: s.getOrNewProcessStageReceivers(),
Labels: convertPromLabels(jc.Labels),
RelabelRules: flowrelabel.Rules{},
}
relabelRulesExpr := s.getOrNewDiscoveryRelabelRules()
hook := func(val interface{}) interface{} {
if _, ok := val.(flowrelabel.Rules); ok {
return common.CustomTokenizer{Expr: relabelRulesExpr}
}
return val
}
s.f.Body().AppendBlock(common.NewBlockWithOverrideFn(
[]string{"loki", "source", "journal"},
s.cfg.JobName,
args,
hook,
))
}
Loading

0 comments on commit eab9427

Please sign in to comment.