Skip to content

Commit

Permalink
chore: Clean up old streams periodically in RF-1 ingester (#13511)
Browse files Browse the repository at this point in the history
  • Loading branch information
benclive authored Jul 17, 2024
1 parent 8ef86f8 commit c0f5fb6
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 103 deletions.
73 changes: 23 additions & 50 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,25 @@ ingester_rf1:
# CLI flag: -ingester-rf1.lifecycler.ID
[id: <string> | default = "<hostname>"]

# The maximum age of a segment before it should be flushed. Increasing this
# value allows more time for a segment to grow to max-segment-size, but may
# increase latency if the write volume is too small.
# CLI flag: -ingester-rf1.max-segment-age
[max_segment_age: <duration> | default = 500ms]

# The maximum size of a segment before it should be flushed. It is not a
# strict limit, and segments can exceed the maximum size when individual
# appends are larger than the remaining capacity.
# CLI flag: -ingester-rf1.max-segment-size
[max_segment_size: <int> | default = 8388608]

# The maximum number of segments to buffer in-memory. Increasing this value
# allows for large bursts of writes to be buffered in memory, but may increase
# latency if the write volume exceeds the rate at which segments can be
# flushed.
# CLI flag: -ingester-rf1.max-segments
[max_segments: <int> | default = 10]

# How many flushes can happen concurrently from each stream.
# CLI flag: -ingester-rf1.concurrent-flushes
[concurrent_flushes: <int> | default = 32]
Expand Down Expand Up @@ -316,56 +335,6 @@ ingester_rf1:
# CLI flag: -ingester-rf1.flush-op-timeout
[flush_op_timeout: <duration> | default = 10m]

# The maximum age of a segment before it should be flushed. Increasing this
# value allows more time for a segment to grow to max-segment-size, but may
# increase latency if the write volume is too small.
# CLI flag: -ingester-rf1.max-segment-age
[max_segment_age: <duration> | default = 500ms]

# The maximum size of a segment before it should be flushed. It is not a
# strict limit, and segments can exceed the maximum size when individual
# appends are larger than the remaining capacity.
# CLI flag: -ingester-rf1.max-segment-size
[max_segment_size: <int> | default = 8388608]

# The maximum number of segments to buffer in-memory. Increasing this value
# allows for large bursts of writes to be buffered in memory, but may increase
# latency if the write volume exceeds the rate at which segments can be
# flushed.
# CLI flag: -ingester-rf1.max-segments
[max_segments: <int> | default = 10]

# How long chunks should be retained in-memory after they've been flushed.
# CLI flag: -ingester-rf1.chunks-retain-period
[chunk_retain_period: <duration> | default = 0s]

[chunk_idle_period: <duration>]

# The targeted _uncompressed_ size in bytes of a chunk block When this
# threshold is exceeded the head block will be cut and compressed inside the
# chunk.
# CLI flag: -ingester-rf1.chunks-block-size
[chunk_block_size: <int> | default = 262144]

# A target _compressed_ size in bytes for chunks. This is a desired size not
# an exact size, chunks may be slightly bigger or significantly smaller if
# they get flushed for other reasons (e.g. chunk_idle_period). A value of 0
# creates chunks with a fixed 10 blocks, a non zero value will create chunks
# with a variable number of blocks to meet the target size.
# CLI flag: -ingester-rf1.chunk-target-size
[chunk_target_size: <int> | default = 1572864]

# The algorithm to use for compressing chunk. (none, gzip, lz4-64k, snappy,
# lz4-256k, lz4-1M, lz4, flate, zstd)
# CLI flag: -ingester-rf1.chunk-encoding
[chunk_encoding: <string> | default = "gzip"]

# The maximum duration of a timeseries chunk in memory. If a timeseries runs
# for longer than this, the current chunk will be flushed to the store and a
# new chunk created.
# CLI flag: -ingester-rf1.max-chunk-age
[max_chunk_age: <duration> | default = 2h]

# Forget about ingesters having heartbeat timestamps older than
# `ring.kvstore.heartbeat_timeout`. This is equivalent to clicking on the
# `/ring` `forget` button in the UI: the ingester is removed from the ring.
Expand Down Expand Up @@ -400,6 +369,10 @@ ingester_rf1:
# CLI flag: -ingester-rf1.owned-streams-check-interval
[owned_streams_check_interval: <duration> | default = 30s]

# How long stream metadata is retained in memory after it was last seen.
# CLI flag: -ingester-rf1.stream-retain-period
[stream_retain_period: <duration> | default = 5m]

# Configures how the pattern ingester will connect to the ingesters.
client_config:
# Configures how connections are pooled.
Expand Down
81 changes: 39 additions & 42 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/client"
"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -68,23 +67,15 @@ var (
type Config struct {
Enabled bool `yaml:"enabled" doc:"description=Whether the ingester is enabled."`

LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."`

ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
MaxSegmentAge time.Duration `yaml:"max_segment_age"`
MaxSegmentSize int `yaml:"max_segment_size"`
MaxSegments int `yaml:"max_segments"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`
parsedEncoding chunkenc.Encoding `yaml:"-"` // placeholder for validated encoding
MaxChunkAge time.Duration `yaml:"max_chunk_age"`
AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty" doc:"description=Configures how the lifecycle of the ingester will operate and where it will register for discovery."`
MaxSegmentAge time.Duration `yaml:"max_segment_age"`
MaxSegmentSize int `yaml:"max_segment_size"`
MaxSegments int `yaml:"max_segments"`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
AutoForgetUnhealthy bool `yaml:"autoforget_unhealthy"`

MaxReturnedErrors int `yaml:"max_returned_stream_errors"`

Expand All @@ -101,6 +92,7 @@ type Config struct {
ShutdownMarkerPath string `yaml:"shutdown_marker_path"`

OwnedStreamsCheckInterval time.Duration `yaml:"owned_streams_check_interval" doc:"description=Interval at which the ingester ownedStreamService checks for changes in the ring to recalculate owned streams."`
StreamRetainPeriod time.Duration `yaml:"stream_retain_period" doc:"description=How long stream metadata is retained in memory after it was last seen."`

// Tee configs
ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."`
Expand All @@ -122,28 +114,17 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxSegmentSize, "ingester-rf1.max-segment-size", 8*1024*1024, "The maximum size of a segment before it should be flushed. It is not a strict limit, and segments can exceed the maximum size when individual appends are larger than the remaining capacity.")
f.IntVar(&cfg.MaxSegments, "ingester-rf1.max-segments", 10, "The maximum number of segments to buffer in-memory. Increasing this value allows for large bursts of writes to be buffered in memory, but may increase latency if the write volume exceeds the rate at which segments can be flushed.")

f.DurationVar(&cfg.RetainPeriod, "ingester-rf1.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
// f.DurationVar(&cfg.MaxChunkIdle, "ingester-rf1.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester-rf1.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
f.IntVar(&cfg.TargetChunkSize, "ingester-rf1.chunk-target-size", 1572864, "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.") // 1.5 MB
f.StringVar(&cfg.ChunkEncoding, "ingester-rf1.chunk-encoding", chunkenc.EncGZIP.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", chunkenc.SupportedEncoding()))
f.IntVar(&cfg.MaxReturnedErrors, "ingester-rf1.max-ignored-stream-errors", 10, "The maximum number of errors a stream will report to the user when a push fails. 0 to make unlimited.")
f.DurationVar(&cfg.MaxChunkAge, "ingester-rf1.max-chunk-age", 2*time.Hour, "The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this, the current chunk will be flushed to the store and a new chunk created.")
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester-rf1.autoforget-unhealthy", false, "Forget about ingesters having heartbeat timestamps older than `ring.kvstore.heartbeat_timeout`. This is equivalent to clicking on the `/ring` `forget` button in the UI: the ingester is removed from the ring. This is a useful setting when you are sure that an unhealthy node won't return. An example is when not using stateful sets or the equivalent. Use `memberlist.rejoin_interval` > 0 to handle network partition cases when using a memberlist.")
f.IntVar(&cfg.IndexShards, "ingester-rf1.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
f.IntVar(&cfg.MaxDroppedStreams, "ingester-rf1.tailer.max-dropped-streams", 10, "Maximum number of dropped streams to keep in memory during tailing.")
f.StringVar(&cfg.ShutdownMarkerPath, "ingester-rf1.shutdown-marker-path", "", "Path where the shutdown marker file is stored. If not set and common.path_prefix is set then common.path_prefix will be used.")
f.DurationVar(&cfg.OwnedStreamsCheckInterval, "ingester-rf1.owned-streams-check-interval", 30*time.Second, "Interval at which the ingester ownedStreamService checks for changes in the ring to recalculate owned streams.")
f.DurationVar(&cfg.StreamRetainPeriod, "ingester-rf1.stream-retain-period", 5*time.Minute, "How long stream metadata should be retained in-memory after the last log was seen.")
f.BoolVar(&cfg.Enabled, "ingester-rf1.enabled", false, "Flag to enable or disable the usage of the ingester-rf1 component.")
}

func (cfg *Config) Validate() error {
enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding)
if err != nil {
return err
}
cfg.parsedEncoding = enc

if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
}
Expand Down Expand Up @@ -202,6 +183,7 @@ type Ingester struct {
store Storage
periodicConfigs []config.PeriodConfig

loopQuit chan struct{}
tailersQuit chan struct{}

// One queue per flush thread. Fingerprint is used to
Expand Down Expand Up @@ -253,8 +235,6 @@ func New(cfg Config, clientConfig client.Config,
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
compressionStats.Set(cfg.ChunkEncoding)
targetSizeStats.Set(int64(cfg.TargetChunkSize))
metrics := newIngesterMetrics(registerer)

walManager, err := wal.NewManager(wal.Config{
Expand All @@ -275,6 +255,7 @@ func New(cfg Config, clientConfig client.Config,
store: storage,
periodicConfigs: periodConfigs,
flushWorkersDone: sync.WaitGroup{},
loopQuit: make(chan struct{}),
tailersQuit: make(chan struct{}),
metrics: metrics,
// flushOnShutdownSwitch: &OnceSwitch{},
Expand Down Expand Up @@ -431,6 +412,7 @@ func (i *Ingester) starting(ctx context.Context) error {
return fmt.Errorf("can not ensure recalculate owned streams service is running: %w", err)
}

go i.periodicStreamMaintenance()
return nil
}

Expand Down Expand Up @@ -506,6 +488,31 @@ func (i *Ingester) removeShutdownMarkerFile() {
}
}

func (i *Ingester) periodicStreamMaintenance() {
streamRetentionTicker := time.NewTicker(i.cfg.StreamRetainPeriod)
defer streamRetentionTicker.Stop()

for {
select {
case <-streamRetentionTicker.C:
i.cleanIdleStreams()
case <-i.loopQuit:
return
}
}
}

func (i *Ingester) cleanIdleStreams() {
for _, instance := range i.getInstances() {
_ = instance.streams.ForEach(func(s *stream) (bool, error) {
if time.Since(s.highestTs) > i.cfg.StreamRetainPeriod {
instance.streams.Delete(s)
}
return true, nil
})
}
}

// PrepareShutdown will handle the /ingester/prepare_shutdown endpoint.
//
// Internally, when triggered, this handler will configure the ingester service to release their resources whenever a SIGTERM is received.
Expand Down Expand Up @@ -885,16 +892,6 @@ func (i *Ingester) getInstances() []*instance {
// return &resp, nil
//}

func adjustQueryStartTime(maxLookBackPeriod time.Duration, start, now time.Time) time.Time {
if maxLookBackPeriod > 0 {
oldestStartTime := now.Add(-maxLookBackPeriod)
if oldestStartTime.After(start) {
return oldestStartTime
}
}
return start
}

func (i *Ingester) GetDetectedFields(_ context.Context, r *logproto.DetectedFieldsRequest) (*logproto.DetectedFieldsResponse, error) {
return &logproto.DetectedFieldsResponse{
Fields: []*logproto.DetectedField{
Expand Down
43 changes: 43 additions & 0 deletions pkg/ingester-rf1/ingester_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package ingesterrf1

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestIngester_cleanIdleStreams(t *testing.T) {
i := &Ingester{
instancesMtx: sync.RWMutex{},
instances: make(map[string]*instance),
cfg: Config{StreamRetainPeriod: time.Minute},
}
instance := &instance{
instanceID: "test",
streams: newStreamsMap(),
}
stream := &stream{
labelsString: "test,label",
highestTs: time.Now(),
}
instance.streams.Store(stream.labelsString, stream)
i.instances[instance.instanceID] = instance

require.Len(t, i.instances, 1)
require.Equal(t, 1, instance.streams.Len())

// No-op
i.cleanIdleStreams()

require.Len(t, i.instances, 1)
require.Equal(t, 1, instance.streams.Len())

// Pretend stream is old and retry
stream.highestTs = time.Now().Add(-time.Minute * 2)
i.cleanIdleStreams()

require.Len(t, i.instances, 1)
require.Equal(t, 0, instance.streams.Len())
}
12 changes: 1 addition & 11 deletions pkg/ingester-rf1/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,6 @@ type stream struct {
chunkHeadBlockFormat chunkenc.HeadBlockFmt
}

type chunkDesc struct {
chunk *chunkenc.MemChunk
closed bool
synced bool
flushed time.Time
reason string

lastUpdated time.Time
}

type entryWithError struct {
entry *logproto.Entry
e error
Expand Down Expand Up @@ -269,7 +259,7 @@ func (s *stream) validateEntries(ctx context.Context, entries []logproto.Entry,
}

// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2)
cutoff := highestTs.Add(-time.Hour)
if s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)})
s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels))
Expand Down

0 comments on commit c0f5fb6

Please sign in to comment.