Skip to content

Commit

Permalink
feat: introduce RingClient abstraction
Browse files Browse the repository at this point in the history
* lint and format
  • Loading branch information
trevorwhitney committed Aug 1, 2024
1 parent ce87fd8 commit afb8513
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 79 deletions.
14 changes: 7 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ const (

ringAutoForgetUnhealthyPeriods = 2

levelLabel = "detected_level"
LevelLabel = "detected_level"
LogLevelUnknown = "unknown"
logLevelDebug = "debug"
logLevelInfo = "info"
logLevelWarn = "warn"
logLevelError = "error"
logLevelFatal = "fatal"
logLevelCritical = "critical"
logLevelTrace = "trace"
logLevelUnknown = "unknown"
)

var (
Expand Down Expand Up @@ -406,9 +406,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
} else {
logLevel = detectLogLevelFromLogEntry(entry, structuredMetadata)
}
if logLevel != logLevelUnknown && logLevel != "" {
if logLevel != LogLevelUnknown && logLevel != "" {
entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{
Name: levelLabel,
Name: LevelLabel,
Value: logLevel,
})
}
Expand Down Expand Up @@ -902,7 +902,7 @@ func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.
return logLevelInfo
}
if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) {
return logLevelUnknown
return LogLevelUnknown
} else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) {
return logLevelTrace
} else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) {
Expand All @@ -916,7 +916,7 @@ func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.
} else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) {
return logLevelFatal
}
return logLevelUnknown
return LogLevelUnknown
}

return extractLogLevelFromLogLine(entry.Line)
Expand Down Expand Up @@ -1016,5 +1016,5 @@ func detectLevelFromLogLine(log string) string {
if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") {
return logLevelDebug
}
return logLevelUnknown
return LogLevelUnknown
}
15 changes: 8 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/push"
loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -1577,7 +1578,7 @@ func Test_DetectLogLevels(t *testing.T) {
require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels)
require.Equal(t, push.LabelsAdapter{
{
Name: levelLabel,
Name: LevelLabel,
Value: logLevelWarn,
},
}, topVal.Streams[0].Entries[0].StructuredMetadata)
Expand All @@ -1594,7 +1595,7 @@ func Test_DetectLogLevels(t *testing.T) {
require.Equal(t, `{foo="bar", level="debug"}`, topVal.Streams[0].Labels)
sm := topVal.Streams[0].Entries[0].StructuredMetadata
require.Len(t, sm, 1)
require.Equal(t, sm[0].Name, levelLabel)
require.Equal(t, sm[0].Name, LevelLabel)
require.Equal(t, sm[0].Value, logLevelDebug)
})

Expand All @@ -1619,7 +1620,7 @@ func Test_DetectLogLevels(t *testing.T) {
Name: "severity",
Value: logLevelWarn,
}, {
Name: levelLabel,
Name: LevelLabel,
Value: logLevelWarn,
},
}, sm)
Expand Down Expand Up @@ -1662,7 +1663,7 @@ func Test_detectLogLevelFromLogEntry(t *testing.T) {
entry: logproto.Entry{
Line: "foo",
},
expectedLogLevel: logLevelUnknown,
expectedLogLevel: LogLevelUnknown,
},
{
name: "non otlp with log level keywords in log line",
Expand Down Expand Up @@ -1746,7 +1747,7 @@ func Test_detectLogLevelFromLogEntry(t *testing.T) {
entry: logproto.Entry{
Line: `foo=bar msg="message with keyword but it should not get picked up" level=NA`,
},
expectedLogLevel: logLevelUnknown,
expectedLogLevel: LogLevelUnknown,
},
{
name: "logfmt log line with label Severity is allowed for level detection",
Expand Down Expand Up @@ -1799,7 +1800,7 @@ func Benchmark_extractLogLevelFromLogLine(b *testing.B) {

for i := 0; i < b.N; i++ {
level := extractLogLevelFromLogLine(logLine)
require.Equal(b, logLevelUnknown, level)
require.Equal(b, LogLevelUnknown, level)
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ type Loki struct {
IngesterRF1 ingester_rf1.Interface
IngesterRF1RingClient *ingester_rf1.RingClient
PatternIngester *pattern.Ingester
PatternRingClient *pattern.RingClient
PatternRingClient pattern.RingClient
Querier querier.Querier
cacheGenerationLoader queryrangebase.CacheGenNumberLoader
querierAPI *querier.QuerierAPI
Expand Down
18 changes: 9 additions & 9 deletions pkg/pattern/aggregation/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (

type Config struct {
// TODO(twhitney): This needs to be a per-tenant config
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."`
DownsamplePeriod time.Duration `yaml:"downsample_period"`
LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."`
WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."`
PushPeriod time.Duration `yaml:"push_period,omitempty" doc:"description=How long to wait in between pushes to Loki."`
HTTPClientConfig config.HTTPClientConfig `yaml:"http_client_config,omitempty" doc:"description=The HTTP client configuration for pushing metrics to Loki."`
UseTLS bool `yaml:"use_tls,omitempty" doc:"description=Whether to use TLS for pushing metrics to Loki."`
BasicAuth BasicAuth `yaml:"basic_auth,omitempty" doc:"description=The basic auth configuration for pushing metrics to Loki."`
BackoffConfig backoff.Config `yaml:"backoff_config,omitempty" doc:"description=The backoff configuration for pushing metrics to Loki."`
Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."`
DownsamplePeriod time.Duration `yaml:"downsample_period"`
LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."`
WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."`
PushPeriod time.Duration `yaml:"push_period,omitempty" doc:"description=How long to wait in between pushes to Loki."`
HTTPClientConfig config.HTTPClientConfig `yaml:"http_client_config,omitempty" doc:"description=The HTTP client configuration for pushing metrics to Loki."`
UseTLS bool `yaml:"use_tls,omitempty" doc:"description=Whether to use TLS for pushing metrics to Loki."`
BasicAuth BasicAuth `yaml:"basic_auth,omitempty" doc:"description=The basic auth configuration for pushing metrics to Loki."`
BackoffConfig backoff.Config `yaml:"backoff_config,omitempty" doc:"description=The backoff configuration for pushing metrics to Loki."`
}

// RegisterFlags registers pattern ingester related flags.
Expand Down
25 changes: 25 additions & 0 deletions pkg/pattern/aggregation/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"sync"
"time"

"github.com/dustin/go-humanize"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/util/build"
Expand Down Expand Up @@ -307,3 +310,25 @@ func (p *Push) send(ctx context.Context, payload []byte) (int, error) {

return status, err
}

func AggregatedMetricEntry(
ts model.Time,
totalBytes, totalCount uint64,
service string,
lbls labels.Labels,
) string {
byteString := humanize.Bytes(totalBytes)
base := fmt.Sprintf(
"ts=%d bytes=%s count=%d %s=%s",
ts.UnixNano(),
byteString,
totalCount,
push.LabelServiceName, service,
)

for _, l := range lbls {
base += fmt.Sprintf(" %s=%s", l.Name, l.Value)
}

return base
}
119 changes: 118 additions & 1 deletion pkg/pattern/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -22,7 +23,8 @@ import (
)

func TestSweepInstance(t *testing.T) {
ing, err := New(defaultIngesterTestConfig(t), "foo", nil, log.NewNopLogger())
ringClient := &fakeRingClient{}
ing, err := New(defaultIngesterTestConfig(t), ringClient, "foo", nil, log.NewNopLogger())
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
err = services.StartAndAwaitRunning(context.Background(), ing)
Expand Down Expand Up @@ -95,3 +97,118 @@ func defaultIngesterTestConfig(t testing.TB) Config {

return cfg
}

type fakeRingClient struct{}

func (f *fakeRingClient) Pool() *ring_client.Pool {
panic("not implemented")
}

func (f *fakeRingClient) StartAsync(_ context.Context) error {
panic("not implemented")
}

func (f *fakeRingClient) AwaitRunning(_ context.Context) error {
panic("not implemented")
}

func (f *fakeRingClient) StopAsync() {
panic("not implemented")
}

func (f *fakeRingClient) AwaitTerminated(_ context.Context) error {
panic("not implemented")
}

func (f *fakeRingClient) FailureCase() error {
panic("not implemented")
}

func (f *fakeRingClient) State() services.State {
panic("not implemented")
}

func (f *fakeRingClient) AddListener(_ services.Listener) {
panic("not implemented")
}

func (f *fakeRingClient) Ring() ring.ReadRing {
return &fakeRing{}
}

type fakeRing struct{}

// InstancesWithTokensCount returns the number of instances in the ring that have tokens.
func (f *fakeRing) InstancesWithTokensCount() int {
panic("not implemented") // TODO: Implement
}

// InstancesInZoneCount returns the number of instances in the ring that are registered in given zone.
func (f *fakeRing) InstancesInZoneCount(_ string) int {
panic("not implemented") // TODO: Implement
}

// InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens.
func (f *fakeRing) InstancesWithTokensInZoneCount(_ string) int {
panic("not implemented") // TODO: Implement
}

// ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring.
func (f *fakeRing) ZonesCount() int {
panic("not implemented") // TODO: Implement
}

func (f *fakeRing) Get(
_ uint32,
_ ring.Operation,
_ []ring.InstanceDesc,
_ []string,
_ []string,
) (ring.ReplicationSet, error) {
panic("not implemented")
}

func (f *fakeRing) GetAllHealthy(_ ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{}, nil
}

func (f *fakeRing) GetReplicationSetForOperation(_ ring.Operation) (ring.ReplicationSet, error) {
return ring.ReplicationSet{}, nil
}

func (f *fakeRing) ReplicationFactor() int {
panic("not implemented")
}

func (f *fakeRing) InstancesCount() int {
panic("not implemented")
}

func (f *fakeRing) ShuffleShard(_ string, _ int) ring.ReadRing {
panic("not implemented")
}

func (f *fakeRing) GetInstanceState(_ string) (ring.InstanceState, error) {
panic("not implemented")
}

func (f *fakeRing) ShuffleShardWithLookback(
_ string,
_ int,
_ time.Duration,
_ time.Time,
) ring.ReadRing {
panic("not implemented")
}

func (f *fakeRing) HasInstance(_ string) bool {
panic("not implemented")
}

func (f *fakeRing) CleanupShuffleShardCache(_ string) {
panic("not implemented")
}

func (f *fakeRing) GetTokenRangesForInstance(_ string) (ring.TokenRanges, error) {
panic("not implemented")
}
10 changes: 5 additions & 5 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (cfg *Config) Validate() error {
type Ingester struct {
services.Service
lifecycler *ring.Lifecycler
ringClient *RingClient
ringClient RingClient

lifecyclerWatcher *services.FailureWatcher

Expand All @@ -92,7 +92,7 @@ type Ingester struct {

func New(
cfg Config,
ringClient *RingClient,
ringClient RingClient,
metricsNamespace string,
registerer prometheus.Registerer,
logger log.Logger,
Expand Down Expand Up @@ -172,7 +172,7 @@ func (i *Ingester) stopping(_ error) error {
flushQueue.Close()
}
i.flushQueuesDone.Wait()
i.stopWriters()
i.stopWriters()
return err
}

Expand Down Expand Up @@ -204,7 +204,7 @@ func (i *Ingester) loop() {
flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j)
defer flushTicker.Stop()

if i.cfg.MetricAggregation.Enabled {
if i.cfg.MetricAggregation.Enabled {
downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod)
defer downsampleTicker.Stop()
for {
Expand Down Expand Up @@ -335,7 +335,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
i.drainCfg,
i.ringClient,
i.lifecycler.ID,
writer,
writer,
)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit afb8513

Please sign in to comment.