From f1f0eab05c04450af143a392b27c044ae4a179b3 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 9 Mar 2021 19:04:11 -0500 Subject: [PATCH 1/5] Add per tenant configs mechanism --- pkg/loki/loki.go | 14 +++++++++----- pkg/loki/modules.go | 8 ++++++++ pkg/loki/runtime_config.go | 15 +++++++++++++++ pkg/util/runtime/config.go | 33 +++++++++++++++++++++++++++++++++ 4 files changed, 65 insertions(+), 5 deletions(-) create mode 100644 pkg/util/runtime/config.go diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 2e2d384a9adfd..12ba1d4f83f5c 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -12,6 +12,7 @@ import ( "github.com/felixge/fgprof" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" + "github.com/grafana/loki/pkg/util/runtime" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/modules" @@ -145,6 +146,7 @@ type Loki struct { Server *server.Server ring *ring.Ring overrides *validation.Overrides + tenantConfigs *runtime.TenantConfigs distributor *distributor.Distributor ingester *ingester.Ingester querier *querier.Querier @@ -344,6 +346,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(MemberlistKV, t.initMemberlistKV) mm.RegisterModule(Ring, t.initRing) mm.RegisterModule(Overrides, t.initOverrides) + mm.RegisterModule(TenantConfigs, t.initTenantConfigs) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(Store, t.initStore) mm.RegisterModule(Ingester, t.initIngester) @@ -360,12 +363,13 @@ func (t *Loki) setupModuleManager() error { deps := map[string][]string{ Ring: {RuntimeConfig, Server, MemberlistKV}, Overrides: {RuntimeConfig}, - Distributor: {Ring, Server, Overrides}, + TenantConfigs: {RuntimeConfig}, + Distributor: {Ring, Server, Overrides, TenantConfigs}, Store: {Overrides}, - Ingester: {Store, Server, MemberlistKV}, - Querier: {Store, Ring, Server, IngesterQuerier}, - QueryFrontend: {Server, Overrides}, - Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides}, + Ingester: {Store, Server, MemberlistKV, TenantConfigs}, + Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs}, + QueryFrontend: {Server, Overrides, TenantConfigs}, + Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs}, TableManager: {Server}, Compactor: {Server}, IngesterQuerier: {Ring}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 52f522ac1af36..72a8d6c9e233f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/loki/pkg/ruler/manager" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" + "github.com/grafana/loki/pkg/util/runtime" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" @@ -60,6 +61,7 @@ const ( Ring string = "ring" RuntimeConfig string = "runtime-config" Overrides string = "overrides" + TenantConfigs string = "tenant-configs" Server string = "server" Distributor string = "distributor" Ingester string = "ingester" @@ -140,6 +142,12 @@ func (t *Loki) initOverrides() (_ services.Service, err error) { return nil, err } +func (t *Loki) initTenantConfigs() (_ services.Service, err error) { + t.tenantConfigs, err = runtime.NewTenantConfigs(tenantConfigFromRuntimeConfig(t.runtimeConfig)) + // tenantConfigs are not a service, since they don't have any operational state. + return nil, err +} + func (t *Loki) initDistributor() (services.Service, error) { t.cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV diff --git a/pkg/loki/runtime_config.go b/pkg/loki/runtime_config.go index 99ea284f34eb6..2f753df774af2 100644 --- a/pkg/loki/runtime_config.go +++ b/pkg/loki/runtime_config.go @@ -7,6 +7,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "gopkg.in/yaml.v2" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -15,6 +16,7 @@ import ( // These values are then pushed to the components that are interested in them. type runtimeConfigValues struct { TenantLimits map[string]*validation.Limits `yaml:"overrides"` + TenantConfig map[string]*runtime.Config `yaml:"configs"` Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` } @@ -45,6 +47,19 @@ func tenantLimitsFromRuntimeConfig(c *runtimeconfig.Manager) validation.TenantLi } } +func tenantConfigFromRuntimeConfig(c *runtimeconfig.Manager) runtime.TenantConfig { + if c == nil { + return nil + } + return func(userID string) *runtime.Config { + cfg, ok := c.GetConfig().(*runtimeConfigValues) + if !ok || cfg == nil { + return nil + } + return cfg.TenantConfig[userID] + } +} + func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-chan kv.MultiRuntimeConfig { if manager == nil { return nil diff --git a/pkg/util/runtime/config.go b/pkg/util/runtime/config.go new file mode 100644 index 0000000000000..8c7e93f8d7733 --- /dev/null +++ b/pkg/util/runtime/config.go @@ -0,0 +1,33 @@ +package runtime + +type Config struct { +} + +// TenantConfig is a function that returns configs for given tenant, or +// nil, if there are no tenant-specific configs. +type TenantConfig func(userID string) *Config + +// TenantConfigs periodically fetch a set of per-user configs, and provides convenience +// functions for fetching the correct value. +type TenantConfigs struct { + defaultConfig *Config + tenantConfig TenantConfig +} + +// NewTenantConfig makes a new TenantConfigs +func NewTenantConfigs(tenantConfig TenantConfig) (*TenantConfigs, error) { + return &TenantConfigs{ + defaultConfig: &Config{}, + tenantConfig: tenantConfig, + }, nil +} + +func (o *TenantConfigs) getOverridesForUser(userID string) *Config { + if o.tenantConfig != nil { + l := o.tenantConfig(userID) + if l != nil { + return l + } + } + return o.defaultConfig +} From e2dfbafc8f26b0c76c983ed127519c1417fe10cb Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 9 Mar 2021 19:22:19 -0500 Subject: [PATCH 2/5] Add per tenant logging of stream creation --- pkg/ingester/checkpoint_test.go | 22 +++++++++++----------- pkg/ingester/flush_test.go | 2 +- pkg/ingester/ingester.go | 11 +++++++---- pkg/ingester/ingester_test.go | 4 ++-- pkg/ingester/instance.go | 29 +++++++++++++++++++++-------- pkg/ingester/instance_test.go | 14 +++++++------- pkg/ingester/recovery_test.go | 4 ++-- pkg/loki/modules.go | 2 +- pkg/util/runtime/config.go | 5 +++++ 9 files changed, 57 insertions(+), 36 deletions(-) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 5aadc50290339..1d374e2d164ea 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -70,7 +70,7 @@ func TestIngesterWAL(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -113,7 +113,7 @@ func TestIngesterWAL(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -127,7 +127,7 @@ func TestIngesterWAL(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -152,7 +152,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -198,7 +198,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { require.NoError(t, err) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -258,7 +258,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -279,7 +279,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester, ensuring we replayed from WAL. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -303,7 +303,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -324,7 +324,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester, ensuring we can replay from the checkpoint as well. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -455,7 +455,7 @@ func Test_SeriesIterator(t *testing.T) { limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) for i := 0; i < 3; i++ { - inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil) + inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, nil, noopWAL{}, NilMetrics, nil) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) instances = append(instances, inst) @@ -505,7 +505,7 @@ func Benchmark_SeriesIterator(b *testing.B) { limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) for i := range instances { - inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil) + inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil) require.NoError(b, inst.Push(context.Background(), &logproto.PushRequest{ diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 982d4bce1f092..e6a30413975af 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -257,7 +257,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - ing, err := New(cfg, client.Config{}, store, limits, nil) + ing, err := New(cfg, client.Config{}, store, limits, nil, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9a05f4186b7b4..81cf22dadaf9d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper" errUtil "github.com/grafana/loki/pkg/util" listutil "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -121,8 +122,9 @@ func (cfg *Config) Validate() error { type Ingester struct { services.Service - cfg Config - clientConfig client.Config + cfg Config + clientConfig client.Config + tenantConfigs *runtime.TenantConfigs shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown instancesMtx sync.RWMutex @@ -168,7 +170,7 @@ type ChunkStore interface { } // New makes a new Ingester. -func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } @@ -178,6 +180,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid i := &Ingester{ cfg: cfg, clientConfig: clientConfig, + tenantConfigs: configs, instances: map[string]*instance{}, store: store, periodicConfigs: store.GetSchemaConfigs(), @@ -401,7 +404,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { defer i.instancesMtx.Unlock() inst, ok = i.instances[instanceID] if !ok { - inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch) + inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch) i.instances[instanceID] = inst } return inst diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 98b8bba0b65de..799c3061758ce 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -37,7 +37,7 @@ func TestIngester(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, nil, nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -219,7 +219,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, overrides, nil) + i, err := New(ingesterConfig, client.Config{}, store, overrides, nil, nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 268e002d797d0..7f8597392fa25 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -79,6 +80,7 @@ type instance struct { tailerMtx sync.RWMutex limiter *Limiter + configs *runtime.TenantConfigs wal WAL @@ -89,14 +91,7 @@ type instance struct { metrics *ingesterMetrics } -func newInstance( - cfg *Config, - instanceID string, - limiter *Limiter, - wal WAL, - metrics *ingesterMetrics, - flushOnShutdownSwitch *OnceSwitch, -) *instance { +func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch) *instance { i := &instance{ cfg: cfg, streams: map[string]*stream{}, @@ -110,6 +105,7 @@ func newInstance( tailers: map[uint32]*tailer{}, limiter: limiter, + configs: configs, wal: wal, metrics: metrics, @@ -209,6 +205,15 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r } if err != nil { + if i.configs.LogStreamCreation(i.instanceID) { + level.Info(util_log.Logger).Log( + "msg", "failed to create stream, exceeded limit", + "tenant", i.instanceID, + "err", err, + "stream", pushReqStream.Labels, + ) + } + validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) bytes := 0 for _, e := range pushReqStream.Entries { @@ -244,6 +249,14 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r i.streamsCreatedTotal.Inc() i.addTailersToNewStream(stream) + if i.configs.LogStreamCreation(i.instanceID) { + level.Info(util_log.Logger).Log( + "msg", "successfully created stream", + "tenant", i.instanceID, + "stream", pushReqStream.Labels, + ) + } + return stream, nil } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 7ce0bcf2946fb..696649681c589 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -38,7 +38,7 @@ func TestLabelsCollisions(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil, &OnceSwitch{}) + i := newInstance(defaultConfig(), "test", limiter, nil, noopWAL{}, nil, &OnceSwitch{}) // avoid entries from the future. tt := time.Now().Add(-5 * time.Minute) @@ -65,7 +65,7 @@ func TestConcurrentPushes(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(defaultConfig(), "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) const ( concurrent = 10 @@ -123,7 +123,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(defaultConfig(), "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) lbls := makeRandomLabels() tt := time.Now() @@ -163,7 +163,7 @@ func Test_SeriesQuery(t *testing.T) { cfg.SyncPeriod = 1 * time.Minute cfg.SyncMinUtilization = 0.20 - instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + instance := newInstance(cfg, "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) currentTime := time.Now() @@ -273,7 +273,7 @@ func Benchmark_PushInstance(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + i := newInstance(&Config{}, "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) ctx := context.Background() for n := 0; n < b.N; n++ { @@ -315,7 +315,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { ctx := context.Background() - inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(&Config{}, "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil) require.NoError(b, err) for i := 0; i < 10000; i++ { @@ -365,7 +365,7 @@ func Test_Iterator(t *testing.T) { defaultLimits := defaultLimitsTestConfig() overrides, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) - instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), noopWAL{}, NilMetrics, nil) + instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), nil, noopWAL{}, NilMetrics, nil) ctx := context.TODO() direction := logproto.BACKWARD limit := uint32(2) diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index 39a3f9149e6db..b9e8e6997a9c1 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -205,7 +205,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, nil, nil) require.NoError(t, err) mkSample := func(i int) *logproto.PushRequest { @@ -239,7 +239,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { require.Equal(t, false, iter.Next()) // create a new ingester now - i, err = New(ingesterConfig, client.Config{}, store, limits, nil) + i, err = New(ingesterConfig, client.Config{}, store, limits, nil, nil) require.NoError(t, err) // recover the checkpointed series diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 72a8d6c9e233f..d26a6e8e4015d 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -226,7 +226,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) { t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort - t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, prometheus.DefaultRegisterer) + t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer) if err != nil { return } diff --git a/pkg/util/runtime/config.go b/pkg/util/runtime/config.go index 8c7e93f8d7733..c1e7b773f57eb 100644 --- a/pkg/util/runtime/config.go +++ b/pkg/util/runtime/config.go @@ -1,6 +1,7 @@ package runtime type Config struct { + LogStreamCreation bool `yaml:"log_stream_creation"` } // TenantConfig is a function that returns configs for given tenant, or @@ -31,3 +32,7 @@ func (o *TenantConfigs) getOverridesForUser(userID string) *Config { } return o.defaultConfig } + +func (o *TenantConfigs) LogStreamCreation(userID string) bool { + return o.getOverridesForUser(userID).LogStreamCreation +} From 9b96692a641fe6ea9954d9aaa93b14d737149f4e Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 9 Mar 2021 19:53:21 -0500 Subject: [PATCH 3/5] fix tests --- pkg/ingester/checkpoint_test.go | 21 +++++++++++---------- pkg/ingester/flush_test.go | 3 ++- pkg/ingester/ingester_test.go | 5 +++-- pkg/ingester/instance_test.go | 15 ++++++++------- pkg/ingester/recovery_test.go | 5 +++-- pkg/util/runtime/config.go | 10 +++++++++- 6 files changed, 36 insertions(+), 23 deletions(-) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 1d374e2d164ea..e0063067e26cb 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -70,7 +71,7 @@ func TestIngesterWAL(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -113,7 +114,7 @@ func TestIngesterWAL(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -127,7 +128,7 @@ func TestIngesterWAL(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -152,7 +153,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -198,7 +199,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { require.NoError(t, err) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -258,7 +259,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -279,7 +280,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester, ensuring we replayed from WAL. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -303,7 +304,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -324,7 +325,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester, ensuring we can replay from the checkpoint as well. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -455,7 +456,7 @@ func Test_SeriesIterator(t *testing.T) { limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) for i := 0; i < 3; i++ { - inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, nil, noopWAL{}, NilMetrics, nil) + inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) instances = append(instances, inst) diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index e6a30413975af..f59871ed97168 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/runtime" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ring" @@ -257,7 +258,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - ing, err := New(cfg, client.Config{}, store, limits, nil, nil) + ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 799c3061758ce..36743044af3d0 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -37,7 +38,7 @@ func TestIngester(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, nil, nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -219,7 +220,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, overrides, nil, nil) + i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 696649681c589..c7e04fc255a06 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + loki_runtime "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -38,7 +39,7 @@ func TestLabelsCollisions(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(defaultConfig(), "test", limiter, nil, noopWAL{}, nil, &OnceSwitch{}) + i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}) // avoid entries from the future. tt := time.Now().Add(-5 * time.Minute) @@ -65,7 +66,7 @@ func TestConcurrentPushes(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - inst := newInstance(defaultConfig(), "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) const ( concurrent = 10 @@ -123,7 +124,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst := newInstance(defaultConfig(), "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) lbls := makeRandomLabels() tt := time.Now() @@ -163,7 +164,7 @@ func Test_SeriesQuery(t *testing.T) { cfg.SyncPeriod = 1 * time.Minute cfg.SyncMinUtilization = 0.20 - instance := newInstance(cfg, "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) + instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) currentTime := time.Now() @@ -273,7 +274,7 @@ func Benchmark_PushInstance(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(&Config{}, "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) + i := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) ctx := context.Background() for n := 0; n < b.N; n++ { @@ -315,7 +316,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { ctx := context.Background() - inst := newInstance(&Config{}, "test", limiter, nil, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil) require.NoError(b, err) for i := 0; i < 10000; i++ { @@ -365,7 +366,7 @@ func Test_Iterator(t *testing.T) { defaultLimits := defaultLimitsTestConfig() overrides, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) - instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), nil, noopWAL{}, NilMetrics, nil) + instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil) ctx := context.TODO() direction := logproto.BACKWARD limit := uint32(2) diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index b9e8e6997a9c1..a26533f633bcc 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" + loki_runtime "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -205,7 +206,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, nil, nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) mkSample := func(i int) *logproto.PushRequest { @@ -239,7 +240,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { require.Equal(t, false, iter.Next()) // create a new ingester now - i, err = New(ingesterConfig, client.Config{}, store, limits, nil, nil) + i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) // recover the checkpointed series diff --git a/pkg/util/runtime/config.go b/pkg/util/runtime/config.go index c1e7b773f57eb..4bb1fbfe31b5c 100644 --- a/pkg/util/runtime/config.go +++ b/pkg/util/runtime/config.go @@ -15,10 +15,18 @@ type TenantConfigs struct { tenantConfig TenantConfig } +// DefaultTenantConfigs creates and returns a new TenantConfigs with the defaults populated. +func DefaultTenantConfigs() *TenantConfigs { + return &TenantConfigs{ + defaultConfig: &Config{}, + tenantConfig: nil, + } +} + // NewTenantConfig makes a new TenantConfigs func NewTenantConfigs(tenantConfig TenantConfig) (*TenantConfigs, error) { return &TenantConfigs{ - defaultConfig: &Config{}, + defaultConfig: DefaultTenantConfigs().defaultConfig, tenantConfig: tenantConfig, }, nil } From b78e53fadc378c1f43a1509b0188ecbd12fcd575 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 9 Mar 2021 20:37:38 -0500 Subject: [PATCH 4/5] enable per tenant configs on push request logging. --- pkg/distributor/distributor.go | 5 ++- pkg/distributor/distributor_test.go | 3 +- pkg/distributor/http.go | 49 ++++++++++++++++++++++++++--- pkg/distributor/http_test.go | 3 +- pkg/ingester/instance.go | 12 +++++-- pkg/loki/modules.go | 2 +- pkg/util/runtime/config.go | 12 ++++++- 7 files changed, 74 insertions(+), 12 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7d916954efbed..e20ec7a233f18 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -27,6 +27,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -65,6 +66,7 @@ type Distributor struct { cfg Config clientCfg client.Config + tenantConfigs *runtime.TenantConfigs ingestersRing ring.ReadRing validator *Validator pool *ring_client.Pool @@ -82,7 +84,7 @@ type Distributor struct { } // New a distributor creates. -func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) { +func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) { factory := cfg.factory if factory == nil { factory = func(addr string) (ring_client.PoolClient, error) { @@ -121,6 +123,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr d := Distributor{ cfg: cfg, clientCfg: clientCfg, + tenantConfigs: configs, ingestersRing: ingestersRing, distributorsRing: distributorsRing, validator: validator, diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 5b4ac06dd5247..9dfab1dfc4b27 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -31,6 +31,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" fe "github.com/grafana/loki/pkg/util/flagext" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -313,7 +314,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client, factory } } - d, err := New(distributorConfig, clientConfig, ingestersRing, overrides, nil) + d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index c663488a4cb68..6f860a5744d59 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -10,6 +10,7 @@ import ( "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/dustin/go-humanize" + gokit "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -43,29 +44,67 @@ const applicationJSON = "application/json" // PushHandler reads a snappy-compressed proto from the HTTP body. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { - req, err := ParseRequest(r) + logger := util_log.WithContext(r.Context(), util_log.Logger) + userID, _ := user.ExtractOrgID(r.Context()) + req, err := ParseRequest(logger, userID, r) if err != nil { + if userID != "" && d.tenantConfigs.LogPushRequest(userID) { + level.Info(logger).Log( + "msg", "push request failed", + "code", http.StatusBadRequest, + "err", err, + ) + } http.Error(w, err.Error(), http.StatusBadRequest) return } + if userID != "" && d.tenantConfigs.LogPushRequestStreams(userID) { + streams := make([]string, 0, len(req.Streams)) + for _, s := range req.Streams { + streams = append(streams, s.Labels) + } + level.Info(logger).Log( + "msg", "push request streams", + "streams", streams, + ) + } + _, err = d.Push(r.Context(), req) if err == nil { + if userID != "" && d.tenantConfigs.LogPushRequest(userID) { + level.Info(logger).Log( + "msg", "push request successful", + ) + } w.WriteHeader(http.StatusNoContent) return } resp, ok := httpgrpc.HTTPResponseFromError(err) if ok { - http.Error(w, string(resp.Body), int(resp.Code)) + body := string(resp.Body) + if userID != "" && d.tenantConfigs.LogPushRequest(userID) { + level.Info(logger).Log( + "msg", "push request failed", + "code", resp.Code, + "err", body, + ) + } + http.Error(w, body, int(resp.Code)) } else { + if userID != "" && d.tenantConfigs.LogPushRequest(userID) { + level.Info(logger).Log( + "msg", "push request failed", + "code", http.StatusInternalServerError, + "err", err.Error(), + ) + } http.Error(w, err.Error(), http.StatusInternalServerError) } } -func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { - userID, _ := user.ExtractOrgID(r.Context()) - logger := util_log.WithContext(r.Context(), util_log.Logger) +func ParseRequest(logger gokit.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) { var body lokiutil.SizeReader diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index a17ebffcb7488..0f68eb5e082a4 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -8,6 +8,7 @@ import ( "strings" "testing" + util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/stretchr/testify/assert" ) @@ -75,7 +76,7 @@ func TestParseRequest(t *testing.T) { if len(test.contentEncoding) > 0 { request.Header.Add("Content-Encoding", test.contentEncoding) } - data, err := ParseRequest(request) + data, err := ParseRequest(util_log.Logger, "", request) if test.valid { assert.Nil(t, err, "Should not give error for %d", index) assert.NotNil(t, data, "Should give data for %d", index) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 7f8597392fa25..96a8a2f49be97 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -208,7 +208,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r if i.configs.LogStreamCreation(i.instanceID) { level.Info(util_log.Logger).Log( "msg", "failed to create stream, exceeded limit", - "tenant", i.instanceID, + "org_id", i.instanceID, "err", err, "stream", pushReqStream.Labels, ) @@ -225,6 +225,14 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r labels, err := logql.ParseLabels(pushReqStream.Labels) if err != nil { + if i.configs.LogStreamCreation(i.instanceID) { + level.Info(util_log.Logger).Log( + "msg", "failed to create stream, failed to parse labels", + "org_id", i.instanceID, + "err", err, + "stream", pushReqStream.Labels, + ) + } return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } fp := i.getHashForLabels(labels) @@ -252,7 +260,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r if i.configs.LogStreamCreation(i.instanceID) { level.Info(util_log.Logger).Log( "msg", "successfully created stream", - "tenant", i.instanceID, + "org_id", i.instanceID, "stream", pushReqStream.Labels, ) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index d26a6e8e4015d..51b241bdf1917 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -152,7 +152,7 @@ func (t *Loki) initDistributor() (services.Service, error) { t.cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV var err error - t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.overrides, prometheus.DefaultRegisterer) + t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/util/runtime/config.go b/pkg/util/runtime/config.go index 4bb1fbfe31b5c..110a9fce2a8df 100644 --- a/pkg/util/runtime/config.go +++ b/pkg/util/runtime/config.go @@ -1,7 +1,9 @@ package runtime type Config struct { - LogStreamCreation bool `yaml:"log_stream_creation"` + LogStreamCreation bool `yaml:"log_stream_creation"` + LogPushRequest bool `yaml:"log_push_request"` + LogPushRequestStreams bool `yaml:"log_push_request_streams"` } // TenantConfig is a function that returns configs for given tenant, or @@ -44,3 +46,11 @@ func (o *TenantConfigs) getOverridesForUser(userID string) *Config { func (o *TenantConfigs) LogStreamCreation(userID string) bool { return o.getOverridesForUser(userID).LogStreamCreation } + +func (o *TenantConfigs) LogPushRequest(userID string) bool { + return o.getOverridesForUser(userID).LogPushRequest +} + +func (o *TenantConfigs) LogPushRequestStreams(userID string) bool { + return o.getOverridesForUser(userID).LogPushRequestStreams +} From 77e765c6a177ba404ed6b39539378a46a75eaad9 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Tue, 9 Mar 2021 20:59:23 -0500 Subject: [PATCH 5/5] fixing up the stream log, changing log levels to debug --- cmd/loki/loki-local-config.yaml | 1 + pkg/distributor/http.go | 27 +++++++++++---------- pkg/ingester/instance.go | 6 ++--- pkg/promtail/targets/lokipush/pushtarget.go | 5 +++- 4 files changed, 22 insertions(+), 17 deletions(-) diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 0b45802732d00..6d39f98ae205e 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -2,6 +2,7 @@ auth_enabled: false server: http_listen_port: 3100 + grpc_listen_port: 9096 ingester: wal: diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 6f860a5744d59..8928ca066bad6 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "net/http" + "strings" "time" "github.com/cortexproject/cortex/pkg/util" @@ -48,8 +49,8 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { userID, _ := user.ExtractOrgID(r.Context()) req, err := ParseRequest(logger, userID, r) if err != nil { - if userID != "" && d.tenantConfigs.LogPushRequest(userID) { - level.Info(logger).Log( + if d.tenantConfigs.LogPushRequest(userID) { + level.Debug(logger).Log( "msg", "push request failed", "code", http.StatusBadRequest, "err", err, @@ -59,21 +60,21 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { return } - if userID != "" && d.tenantConfigs.LogPushRequestStreams(userID) { - streams := make([]string, 0, len(req.Streams)) + if d.tenantConfigs.LogPushRequestStreams(userID) { + var sb strings.Builder for _, s := range req.Streams { - streams = append(streams, s.Labels) + sb.WriteString(s.Labels) } - level.Info(logger).Log( + level.Debug(logger).Log( "msg", "push request streams", - "streams", streams, + "streams", sb.String(), ) } _, err = d.Push(r.Context(), req) if err == nil { - if userID != "" && d.tenantConfigs.LogPushRequest(userID) { - level.Info(logger).Log( + if d.tenantConfigs.LogPushRequest(userID) { + level.Debug(logger).Log( "msg", "push request successful", ) } @@ -84,8 +85,8 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { resp, ok := httpgrpc.HTTPResponseFromError(err) if ok { body := string(resp.Body) - if userID != "" && d.tenantConfigs.LogPushRequest(userID) { - level.Info(logger).Log( + if d.tenantConfigs.LogPushRequest(userID) { + level.Debug(logger).Log( "msg", "push request failed", "code", resp.Code, "err", body, @@ -93,8 +94,8 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { } http.Error(w, body, int(resp.Code)) } else { - if userID != "" && d.tenantConfigs.LogPushRequest(userID) { - level.Info(logger).Log( + if d.tenantConfigs.LogPushRequest(userID) { + level.Debug(logger).Log( "msg", "push request failed", "code", http.StatusInternalServerError, "err", err.Error(), diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 96a8a2f49be97..c98ebb8cf9551 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -206,7 +206,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r if err != nil { if i.configs.LogStreamCreation(i.instanceID) { - level.Info(util_log.Logger).Log( + level.Debug(util_log.Logger).Log( "msg", "failed to create stream, exceeded limit", "org_id", i.instanceID, "err", err, @@ -226,7 +226,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r labels, err := logql.ParseLabels(pushReqStream.Labels) if err != nil { if i.configs.LogStreamCreation(i.instanceID) { - level.Info(util_log.Logger).Log( + level.Debug(util_log.Logger).Log( "msg", "failed to create stream, failed to parse labels", "org_id", i.instanceID, "err", err, @@ -258,7 +258,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r i.addTailersToNewStream(stream) if i.configs.LogStreamCreation(i.instanceID) { - level.Info(util_log.Logger).Log( + level.Debug(util_log.Logger).Log( "msg", "successfully created stream", "org_id", i.instanceID, "stream", pushReqStream.Labels, diff --git a/pkg/promtail/targets/lokipush/pushtarget.go b/pkg/promtail/targets/lokipush/pushtarget.go index 9c3ea6d936cae..a2d488dbeeba7 100644 --- a/pkg/promtail/targets/lokipush/pushtarget.go +++ b/pkg/promtail/targets/lokipush/pushtarget.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/weaveworks/common/server" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/distributor" "github.com/grafana/loki/pkg/logproto" @@ -102,7 +103,9 @@ func (t *PushTarget) run() error { } func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) { - req, err := distributor.ParseRequest(r) + logger := util_log.WithContext(r.Context(), util_log.Logger) + userID, _ := user.ExtractOrgID(r.Context()) + req, err := distributor.ParseRequest(logger, userID, r) if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest)