Skip to content

Commit

Permalink
Collect the reason why we decided to flush a series (#761)
Browse files Browse the repository at this point in the history
  • Loading branch information
bboreham authored Mar 21, 2018
1 parent 4fcdbc6 commit 74ec287
Showing 1 changed file with 27 additions and 13 deletions.
40 changes: 27 additions & 13 deletions pkg/ingester/ingester_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ func (i *Ingester) sweepUsers(immediate bool) {
}
}

type flushReason int

const (
noFlush = iota
reasonImmediate
reasonMultipleChunksInSeries
reasonAged
reasonIdle
)

// sweepSeries schedules a series for flushing based on a set of criteria
//
// NB we don't close the head chunk here, as the series could wait in the queue
Expand All @@ -70,39 +80,42 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo
firstTime := series.firstTime()
flush := i.shouldFlushSeries(series, immediate)

if flush {
if flush != noFlush {
flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
util.Event().Log("msg", "add to flush queue", "userID", userID, "numChunks", len(series.chunkDescs), "firstTime", firstTime, "fp", fp, "series", series.metric)
util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric)
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate})
}
}

func (i *Ingester) shouldFlushSeries(series *memorySeries, immediate bool) bool {
func (i *Ingester) shouldFlushSeries(series *memorySeries, immediate bool) flushReason {
if immediate {
return reasonImmediate
}
// Series should be scheduled for flushing if they have more than one chunk
if immediate || len(series.chunkDescs) > 1 {
return true
if len(series.chunkDescs) > 1 {
return reasonMultipleChunksInSeries
}

// Or if the only existing chunk need flushing
if len(series.chunkDescs) > 0 {
return i.shouldFlushChunk(series.chunkDescs[0])
}

return false
return noFlush
}

func (i *Ingester) shouldFlushChunk(c *desc) bool {
func (i *Ingester) shouldFlushChunk(c *desc) flushReason {
// Chunks should be flushed if they span longer than MaxChunkAge
if c.LastTime.Sub(c.FirstTime) > i.cfg.MaxChunkAge {
return true
return reasonAged
}

// Chunk should be flushed if their last update is older then MaxChunkIdle
if model.Now().Sub(c.LastUpdate) > i.cfg.MaxChunkIdle {
return true
return reasonIdle
}

return false
return noFlush
}

func (i *Ingester) flushLoop(j int) {
Expand Down Expand Up @@ -148,14 +161,15 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
}

userState.fpLocker.Lock(fp)
if !i.shouldFlushSeries(series, immediate) {
reason := i.shouldFlushSeries(series, immediate)
if reason == noFlush {
userState.fpLocker.Unlock(fp)
return nil
}

// Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it.
chunks := series.chunkDescs
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head())) {
if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head()) != noFlush) {
series.closeHead()
} else {
chunks = chunks[:len(chunks)-1]
Expand All @@ -171,7 +185,7 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel() // releases resources if slowOperation completes before timeout elapses

util.Event().Log("msg", "flush chunks", "userID", userID, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric)
util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric)
err := i.flushChunks(ctx, fp, series.metric, chunks)
if err != nil {
util.Event().Log("msg", "flush error", "userID", userID, "err", err, "fp", fp, "series", series.metric)
Expand Down

0 comments on commit 74ec287

Please sign in to comment.