Skip to content

Commit

Permalink
feat: add recalculateOwnedStreams to check stream ownership if the ri…
Browse files Browse the repository at this point in the history
…ng is changed (#13103)

Signed-off-by: JordanRushing <[email protected]>
Signed-off-by: Vladyslav Diachenko <[email protected]>
Co-authored-by: Vladyslav Diachenko <[email protected]>
Co-authored-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
3 people authored Jun 14, 2024
1 parent 24c583f commit e7689b2
Show file tree
Hide file tree
Showing 86 changed files with 7,285 additions and 1,417 deletions.
31 changes: 30 additions & 1 deletion docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2879,6 +2879,11 @@ wal:
# common.path_prefix is set then common.path_prefix will be used.
# CLI flag: -ingester.shutdown-marker-path
[shutdown_marker_path: <string> | default = ""]

# Interval at which the ingester ownedStreamService checks for changes in the
# ring to recalculate owned streams.
# CLI flag: -ingester.owned-streams-check-interval
[owned_streams_check_interval: <duration> | default = 30s]
```
### ingester_client
Expand Down Expand Up @@ -3540,7 +3545,7 @@ When a memberlist config with atleast 1 join_members is defined, kvstore of type
# The timeout for establishing a connection with a remote node, and for
# read/write operations.
# CLI flag: -memberlist.stream-timeout
[stream_timeout: <duration> | default = 10s]
[stream_timeout: <duration> | default = 2s]
# Multiplication factor used when sending out messages (factor * log(N+1)).
# CLI flag: -memberlist.retransmit-factor
Expand Down Expand Up @@ -4755,6 +4760,10 @@ Configures the `server` of the launched module(s).
# CLI flag: -server.grpc-conn-limit
[grpc_listen_conn_limit: <int> | default = 0]
# Enables PROXY protocol.
# CLI flag: -server.proxy-protocol-enabled
[proxy_protocol_enabled: <boolean> | default = false]
# Comma-separated list of cipher suites to use. If blank, the default Go cipher
# suites is used.
# CLI flag: -server.tls-cipher-suites
Expand Down Expand Up @@ -4909,6 +4918,21 @@ grpc_tls_config:
# CLI flag: -server.grpc.num-workers
[grpc_server_num_workers: <int> | default = 0]
# If true, the request_message_bytes, response_message_bytes, and
# inflight_requests metrics will be tracked. Enabling this option prevents the
# use of memory pools for parsing gRPC request bodies and may lead to more
# memory allocations.
# CLI flag: -server.grpc.stats-tracking-enabled
[grpc_server_stats_tracking_enabled: <boolean> | default = true]
# If true, gGPC's buffer pools will be used to handle incoming requests.
# Enabling this feature can reduce memory allocation, but also requires
# disabling GRPC server stats tracking by setting
# `server.grpc.stats-tracking-enabled=false`. This is an experimental gRPC
# feature, so it might be removed in a future version of the gRPC library.
# CLI flag: -server.grpc.recv-buffer-pools-enabled
[grpc_server_recv_buffer_pools_enabled: <boolean> | default = false]

# Output log messages in the given format. Valid formats: [logfmt, json]
# CLI flag: -log.format
[log_format: <string> | default = "logfmt"]
Expand All @@ -4922,6 +4946,11 @@ grpc_tls_config:
# CLI flag: -server.log-source-ips-enabled
[log_source_ips_enabled: <boolean> | default = false]

# Log all source IPs instead of only the originating one. Only used if
# server.log-source-ips-enabled is true
# CLI flag: -server.log-source-ips-full
[log_source_ips_full: <boolean> | default = false]

# Header field storing the source IPs. Only used if
# server.log-source-ips-enabled is true. If not set the default Forwarded,
# X-Real-IP and X-Forwarded-For headers are used
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ require (
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.5.0
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb
github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56
github.com/grafana/regexp v0.0.0-20221122212121-6b5c0a4cb7fd
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
Expand Down Expand Up @@ -153,6 +153,7 @@ require (
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
Expand Down Expand Up @@ -312,7 +313,6 @@ require (
github.com/sercand/kuberesolver/v5 v5.1.1 // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1017,14 +1017,14 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb h1:AWE6+kvtE18HP+lRWNUCyvymyrFSXs6TcS2vXIXGIuw=
github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb/go.mod h1:kkWM4WUV230bNG3urVRWPBnSJHs64y/0RmWjftnnn0c=
github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3 h1:k8vINlI4w+RYc37NRwQlRe/IHYoEbu6KAe2XdGDeV1U=
github.com/grafana/dskit v0.0.0-20240528015923-27d7d41066d3/go.mod h1:HvSf3uf8Ps2vPpzHeAFyZTdUcbVr+Rxpq1xcx7J/muc=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0 h1:aLBiDMjTtXx2800iCIp+8kdjIlvGX0MF/zICQMQO2qU=
github.com/grafana/gomemcache v0.0.0-20231204155601-7de47a8c3cb0/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE=
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56/go.mod h1:PGk3RjYHpxMM8HFPhKKo+vve3DdlPUELZLSDEFehPuU=
github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d h1:YwbJJ/PrVWVdnR+j/EAVuazdeP+Za5qbiH1Vlr+wFXs=
github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d/go.mod h1:796sq+UcONnSlzA3RtlBZ+b/hrerkZXiEmO8oMjyRwY=
github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU=
Expand Down Expand Up @@ -1541,6 +1541,8 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs=
github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -1697,8 +1699,6 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/softlayer/softlayer-go v0.0.0-20180806151055-260589d94c7d/go.mod h1:Cw4GTlQccdRGSEf6KiMju767x0NEHE0YIVPJSaXjlsw=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/soniah/gosnmp v1.25.0/go.mod h1:8YvfZxH388NIIw2A+X5z2Oh97VcNhtmxDLt5QeUzVuQ=
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
Expand Down
32 changes: 21 additions & 11 deletions pkg/ingester/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ func TestIngesterWAL(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -113,7 +115,7 @@ func TestIngesterWAL(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -127,7 +129,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -150,7 +152,9 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -196,7 +200,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -253,7 +257,9 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -274,7 +280,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)

// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand All @@ -295,7 +301,9 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand All @@ -316,7 +324,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))

// restart the ingester, ensuring we can replay from the checkpoint as well.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down Expand Up @@ -591,7 +599,9 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
}
}

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
readRingMock := mockReadRingWithOneActiveIngester()

i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -663,7 +673,7 @@ func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
require.NoError(t, err)

// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokit_log.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
Expand Down
5 changes: 4 additions & 1 deletion pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,12 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
chunks: map[string][]chunk.Chunk{},
}

readRingMock := mockReadRingWithOneActiveIngester()

limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)

ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger(), nil)
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, gokitlog.NewNopLogger(), nil, readRingMock)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

Expand Down Expand Up @@ -376,6 +378,7 @@ func defaultIngesterTestConfig(t testing.TB) Config {
cfg.BlockSize = 256 * 1024
cfg.TargetChunkSize = 1500 * 1024
cfg.WAL.Enabled = false
cfg.OwnedStreamsCheckInterval = 1 * time.Second
return cfg
}

Expand Down
Loading

0 comments on commit e7689b2

Please sign in to comment.