-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added tail length limit to limit duration of live tailing of logs to 1 hour #756
Changes from 2 commits
96becf3
45e55a1
028b4c2
5d97006
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -83,12 +83,13 @@ type Tailer struct { | |||||
querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters | ||||||
querierTailClientsMtx sync.Mutex | ||||||
|
||||||
stopped bool | ||||||
blocked bool | ||||||
blockedMtx sync.RWMutex | ||||||
delayFor time.Duration | ||||||
responseChan chan *TailResponse | ||||||
closeErrChan chan error | ||||||
stopped bool | ||||||
blocked bool | ||||||
blockedMtx sync.RWMutex | ||||||
delayFor time.Duration | ||||||
responseChan chan *TailResponse | ||||||
closeErrChan chan error | ||||||
tailMaxDuration time.Duration | ||||||
|
||||||
// when tail client is slow, drop entry and store its details in droppedEntries to notify client | ||||||
droppedEntries []droppedEntry | ||||||
|
@@ -103,22 +104,31 @@ func (t *Tailer) readTailClients() { | |||||
// keeps sending oldest entry to responseChan. If channel is blocked drop the entry | ||||||
// When channel is unblocked, send details of dropped entries with current entry | ||||||
func (t *Tailer) loop() { | ||||||
ticker := time.NewTicker(checkConnectionsWithIngestersPeriod) | ||||||
defer ticker.Stop() | ||||||
checkConnectionTicker := time.NewTicker(checkConnectionsWithIngestersPeriod) | ||||||
defer checkConnectionTicker.Stop() | ||||||
|
||||||
tailMaxDurationTicker := time.NewTicker(t.tailMaxDuration) | ||||||
defer tailMaxDurationTicker.Stop() | ||||||
|
||||||
tailResponse := new(TailResponse) | ||||||
|
||||||
for { | ||||||
if t.stopped { | ||||||
break | ||||||
return | ||||||
} | ||||||
|
||||||
select { | ||||||
case <-ticker.C: | ||||||
case <-checkConnectionTicker.C: | ||||||
// Try to reconnect dropped ingesters and connect to new ingesters | ||||||
if err := t.checkIngesterConnections(); err != nil { | ||||||
level.Error(util.Logger).Log("Error reconnecting to disconnected ingesters", fmt.Sprintf("%v", err)) | ||||||
} | ||||||
case <-tailMaxDurationTicker.C: | ||||||
if err := t.close(); err != nil { | ||||||
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
t.closeErrChan <- errors.New("Reached tail max duration limit") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error strings should not start with a capital letter because they'll often be prefixed before printing. |
||||||
return | ||||||
default: | ||||||
} | ||||||
|
||||||
|
@@ -135,7 +145,7 @@ func (t *Tailer) loop() { | |||||
level.Error(util.Logger).Log("Error closing Tailer", fmt.Sprintf("%v", err)) | ||||||
} | ||||||
t.closeErrChan <- errors.New("All ingesters closed the connection") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while you're at it, could you also remove the capital letter here. |
||||||
break | ||||||
return | ||||||
} | ||||||
time.Sleep(nextEntryWait) | ||||||
continue | ||||||
|
@@ -316,7 +326,7 @@ func (t *Tailer) getCloseErrorChan() <-chan error { | |||||
|
||||||
func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Querier_TailClient, | ||||||
queryDroppedStreams func(from, to time.Time, labels string) (iter.EntryIterator, error), | ||||||
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error)) *Tailer { | ||||||
tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration) *Tailer { | ||||||
t := Tailer{ | ||||||
openStreamIterator: iter.NewHeapIterator([]iter.EntryIterator{}, logproto.FORWARD), | ||||||
//droppedStreamsIterator: &droppedStreamsIterator{}, | ||||||
|
@@ -326,6 +336,7 @@ func newTailer(delayFor time.Duration, querierTailClients map[string]logproto.Qu | |||||
responseChan: make(chan *TailResponse, bufferSizeForTailResponse), | ||||||
closeErrChan: make(chan error), | ||||||
tailDisconnectedIngesters: tailDisconnectedIngesters, | ||||||
tailMaxDuration: tailMaxDuration, | ||||||
} | ||||||
|
||||||
t.readTailClients() | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.