From 96becf3a0b5f11db2a0a7750c9b8d2279b2e9852 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Mon, 15 Jul 2019 19:58:16 +0530 Subject: [PATCH 1/4] Added tail length limit to limit duration of live tailing of logs to 1 hour --- pkg/querier/querier.go | 4 +++- pkg/querier/tail.go | 32 ++++++++++++++++++++++---------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 633fc587ffd4f..ea209f2e5a7f2 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -22,10 +22,12 @@ import ( // Config for a querier. type Config struct { + TailLengthLimit time.Duration `yaml:"tail_length_limit"` } // RegisterFlags register flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.TailLengthLimit, "querier.tail-length-limit", 1*time.Hour, "Limit the duration for which live tailing request would be served") } // Querier handlers queries. @@ -262,7 +264,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, return q.queryDroppedStreams(ctx, req, from, to, labels) }, func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { return q.tailDisconnectedIngesters(ctx, req, connectedIngestersAddr) - }), nil + }, q.cfg.TailLengthLimit), nil } // passed to tailer for querying dropped streams diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 766fa984685d9..d7b3b1ab2bcf7 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -83,12 +83,13 @@ type Tailer struct { querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters querierTailClientsMtx sync.Mutex - stopped bool - blocked bool - blockedMtx sync.RWMutex - delayFor time.Duration - responseChan chan *TailResponse - closeErrChan chan error + stopped bool + blocked bool + blockedMtx sync.RWMutex + delayFor time.Duration + responseChan chan *TailResponse + closeErrChan chan error + tailLengthLimit time.Duration // when tail client is slow, drop entry and store its details in droppedEntries to notify client droppedEntries []droppedEntry @@ -103,22 +104,32 @@ func (t *Tailer) readTailClients() { // keeps sending oldest entry to responseChan. If channel is blocked drop the entry // When channel is unblocked, send details of dropped entries with current entry func (t *Tailer) loop() { - ticker := time.NewTicker(checkConnectionsWithIngestersPeriod) - defer ticker.Stop() + checkConnectionTicker := time.NewTicker(checkConnectionsWithIngestersPeriod) + defer checkConnectionTicker.Stop() + + tailLengthLimitTicker := time.NewTicker(t.tailLengthLimit) + defer tailLengthLimitTicker.Stop() tailResponse := new(TailResponse) +outer: for { if t.stopped { break } select { - case <-ticker.C: + case <-checkConnectionTicker.C: // Try to reconnect dropped ingesters and connect to new ingesters if err := t.checkIngesterConnections(); err != nil { level.Error(util.Logger).Log("Error reconnecting to disconnected ingesters", fmt.Sprintf("%v", err)) } + case <-tailLengthLimitTicker.C: + if err := t.close(); err != nil { + level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + } + t.closeErrChan <- errors.New("Reached tail length limit") + break outer default: } @@ -316,7 +327,7 @@ func (t *Tailer) getCloseErrorChan() <-chan error { func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient, queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error), - tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error)) *Tailer { + tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailLengthLimit time.Duration) *Tailer { t := Tailer{ openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD), //droppedStreamsIterator: &droppedStreamsIterator{}, @@ -326,6 +337,7 @@ func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Qu responseChan: make(chan *TailResponse, bufferSizeForTailResponse), closeErrChan: make(chan error), tailDisconnectedIngesters: tailDisconnectedIngesters, + tailLengthLimit: tailLengthLimit, } t.readTailClients() From 45e55a1d8b887006a3723bcca88aecc6a7abd209 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 17 Jul 2019 11:50:02 +0530 Subject: [PATCH 2/4] Some code refactoring suggested from PR review for live tailing duration limit --- pkg/querier/querier.go | 6 +++--- pkg/querier/tail.go | 21 ++++++++++----------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index ea209f2e5a7f2..e1cca3755b550 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -22,12 +22,12 @@ import ( // Config for a querier. type Config struct { - TailLengthLimit time.Duration `yaml:"tail_length_limit"` + TailMaxDuration time.Duration `yaml:"tail_max_duration"` } // RegisterFlags register flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.DurationVar(&cfg.TailLengthLimit, "querier.tail-length-limit", 1*time.Hour, "Limit the duration for which live tailing request would be served") + f.DurationVar(&cfg.TailMaxDuration, "querier.tail-max-duration", 1*time.Hour, "Limit the duration for which live tailing request would be served") } // Querier handlers queries. @@ -264,7 +264,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, return q.queryDroppedStreams(ctx, req, from, to, labels) }, func(connectedIngestersAddr []string) (map[string]logproto.Querier_TailClient, error) { return q.tailDisconnectedIngesters(ctx, req, connectedIngestersAddr) - }, q.cfg.TailLengthLimit), nil + }, q.cfg.TailMaxDuration), nil } // passed to tailer for querying dropped streams diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index d7b3b1ab2bcf7..ca96f4981635f 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -89,7 +89,7 @@ type Tailer struct { delayFor time.Duration responseChan chan *TailResponse closeErrChan chan error - tailLengthLimit time.Duration + tailMaxDuration time.Duration // when tail client is slow, drop entry and store its details in droppedEntries to notify client droppedEntries []droppedEntry @@ -107,15 +107,14 @@ func (t *Tailer) loop() { checkConnectionTicker := time.NewTicker(checkConnectionsWithIngestersPeriod) defer checkConnectionTicker.Stop() - tailLengthLimitTicker := time.NewTicker(t.tailLengthLimit) - defer tailLengthLimitTicker.Stop() + tailMaxDurationTicker := time.NewTicker(t.tailMaxDuration) + defer tailMaxDurationTicker.Stop() tailResponse := new(TailResponse) -outer: for { if t.stopped { - break + return } select { @@ -124,12 +123,12 @@ outer: if err := t.checkIngesterConnections(); err != nil { level.Error(util.Logger).Log("Error reconnecting to disconnected ingesters", fmt.Sprintf("%v", err)) } - case <-tailLengthLimitTicker.C: + case <-tailMaxDurationTicker.C: if err := t.close(); err != nil { level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) } - t.closeErrChan <- errors.New("Reached tail length limit") - break outer + t.closeErrChan <- errors.New("Reached tail max duration limit") + return default: } @@ -146,7 +145,7 @@ outer: level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) } t.closeErrChan <- errors.New("All ingesters closed the connection") - break + return } time.Sleep(nextEntryWait) continue @@ -327,7 +326,7 @@ func (t *Tailer) getCloseErrorChan() <-chan error { func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient, queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error), - tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailLengthLimit time.Duration) *Tailer { + tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration) *Tailer { t := Tailer{ openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD), //droppedStreamsIterator: &droppedStreamsIterator{}, @@ -337,7 +336,7 @@ func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Qu responseChan: make(chan *TailResponse, bufferSizeForTailResponse), closeErrChan: make(chan error), tailDisconnectedIngesters: tailDisconnectedIngesters, - tailLengthLimit: tailLengthLimit, + tailMaxDuration: tailMaxDuration, } t.readTailClients() From 028b4c2ff0fa518f42a5e1bdea344e5ebab92efa Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 17 Jul 2019 18:39:06 +0530 Subject: [PATCH 3/4] Fixed error messages in live tailing of logs --- pkg/querier/tail.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index ca96f4981635f..cf4ea59562b7c 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -121,13 +121,13 @@ func (t *Tailer) loop() { case <-checkConnectionTicker.C: // Try to reconnect dropped ingesters and connect to new ingesters if err := t.checkIngesterConnections(); err != nil { - level.Error(util.Logger).Log("Error reconnecting to disconnected ingesters", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg","Error reconnecting to disconnected ingesters", "err",err) } case <-tailMaxDurationTicker.C: if err := t.close(); err != nil { - level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) + level.Error(util.Logger).Log("msg","Error closing Tailer", "err", err) } - t.closeErrChan <- errors.New("Reached tail max duration limit") + t.closeErrChan <- errors.New("reached tail max duration limit") return default: } @@ -144,7 +144,7 @@ func (t *Tailer) loop() { if err := t.close(); err != nil { level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) } - t.closeErrChan <- errors.New("All ingesters closed the connection") + t.closeErrChan <- errors.New("all ingesters closed the connection") return } time.Sleep(nextEntryWait) From 5d97006c00dd61b2dd1b5a0e043576f17af2ff58 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Wed, 17 Jul 2019 18:48:32 +0530 Subject: [PATCH 4/4] Fixed lint errors --- pkg/querier/tail.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index cf4ea59562b7c..a0845e011eef9 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -121,11 +121,11 @@ func (t *Tailer) loop() { case <-checkConnectionTicker.C: // Try to reconnect dropped ingesters and connect to new ingesters if err := t.checkIngesterConnections(); err != nil { - level.Error(util.Logger).Log("msg","Error reconnecting to disconnected ingesters", "err",err) + level.Error(util.Logger).Log("msg", "Error reconnecting to disconnected ingesters", "err", err) } case <-tailMaxDurationTicker.C: if err := t.close(); err != nil { - level.Error(util.Logger).Log("msg","Error closing Tailer", "err", err) + level.Error(util.Logger).Log("msg", "Error closing Tailer", "err", err) } t.closeErrChan <- errors.New("reached tail max duration limit") return