Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Encapsulate all isolation details.
Browse files Browse the repository at this point in the history
Signed-off-by: Goutham Veeramachaneni <[email protected]>
  • Loading branch information
gouthamve committed Apr 4, 2018
1 parent d8db95c commit 401f50d
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 172 deletions.
13 changes: 0 additions & 13 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,6 @@ type BlockReader interface {
Tombstones() (TombstoneReader, error)
}

// IsolationState holds the isolation information.
type IsolationState struct {
// We will ignore all writes above the max, or that are incomplete.
maxWriteID uint64
incompleteWrites map[uint64]struct{}
lowWaterMark uint64 // Lowest of incompleteWrites/maxWriteId.
head *Head

// Doubly linked list of active reads.
next *IsolationState
prev *IsolationState
}

// BlockMeta provides meta information about a block.
type BlockMeta struct {
// Unique identifier for the block and its contents. Changes on compaction.
Expand Down
151 changes: 33 additions & 118 deletions head.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,7 @@ type Head struct {

tombstones memTombstones

// Mutex for accessing writeLastId and writesOpen.
writeMtx sync.Mutex
// Each write is given an internal id.
lastWriteID uint64
// Which writes are currently in progress.
writesOpen map[uint64]struct{}
// Mutex for accessing readLastId.
// If taking both writeMtx and readMtx, take writeMtx first.
readMtx sync.Mutex
// All current in use isolationStates. This is a doubly-linked list.
readsOpen *IsolationState
iso *isolation
}

type headMetrics struct {
Expand Down Expand Up @@ -166,15 +156,15 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "tsdb_isolation_low_watermark",
Help: "The lowest write id that is still referenced.",
}, func() float64 {
return float64(h.readLowWatermark())
return float64(h.iso.lowWatermark())
})
m.highWatermark = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_isolation_high_watermark",
Help: "The highest write id that has been given out.",
}, func() float64 {
h.writeMtx.Lock()
defer h.writeMtx.Unlock()
return float64(h.lastWriteID)
h.iso.writeMtx.Lock()
defer h.iso.writeMtx.Unlock()
return float64(h.iso.lastWriteID)
})

if r != nil {
Expand Down Expand Up @@ -210,9 +200,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
if chunkRange < 1 {
return nil, errors.Errorf("invalid chunk range %d", chunkRange)
}
headIso := &IsolationState{}
headIso.next = headIso
headIso.prev = headIso

h := &Head{
wal: wal,
Expand All @@ -226,8 +213,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
postings: index.NewUnorderedMemPostings(),
tombstones: memTombstones{},

writesOpen: map[uint64]struct{}{},
readsOpen: headIso,
iso: newIsolation(),
}
h.metrics = newHeadMetrics(h, r)

Expand Down Expand Up @@ -428,7 +414,7 @@ func (h *rangeHead) Index() (IndexReader, error) {
}

func (h *rangeHead) Chunks() (ChunkReader, error) {
return h.head.chunksRange(h.mint, h.maxt, h.head.IsolationState()), nil
return h.head.chunksRange(h.mint, h.maxt, h.head.iso.State()), nil
}

func (h *rangeHead) Tombstones() (TombstoneReader, error) {
Expand Down Expand Up @@ -480,13 +466,8 @@ func (a *initAppender) Rollback() error {
func (h *Head) Appender() Appender {
h.metrics.activeAppenders.Inc()

h.writeMtx.Lock()
h.lastWriteID++
writeID := h.lastWriteID
h.writesOpen[writeID] = struct{}{}
h.writeMtx.Unlock()

cleanupWriteIDsBelow := h.readLowWatermark()
writeID := h.iso.newWriteID()
cleanupWriteIDsBelow := h.iso.lowWatermark()

// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
Expand Down Expand Up @@ -614,9 +595,7 @@ func (a *headAppender) Commit() error {
}
}

a.head.writeMtx.Lock()
delete(a.head.writesOpen, a.writeID)
a.head.writeMtx.Unlock()
a.head.iso.closeWrite(a.writeID)

return nil
}
Expand Down Expand Up @@ -725,14 +704,14 @@ func (h *Head) indexRange(mint, maxt int64) *headIndexReader {

// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64, h.IsolationState()), nil
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State()), nil
}

func (h *Head) chunksRange(mint, maxt int64, isolation *IsolationState) *headChunkReader {
func (h *Head) chunksRange(mint, maxt int64, isoState *IsolationState) *headChunkReader {
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headChunkReader{head: h, mint: mint, maxt: maxt, isolation: isolation}
return &headChunkReader{head: h, mint: mint, maxt: maxt, isoState: isoState}
}

// MinTime returns the lowest time bound on visible data in the head.
Expand All @@ -754,11 +733,11 @@ type headChunkReader struct {
head *Head
mint, maxt int64

isolation *IsolationState
isoState *IsolationState
}

func (h *headChunkReader) Close() error {
h.isolation.Close()
h.isoState.Close()
return nil
}

Expand Down Expand Up @@ -809,7 +788,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
s: s,
cid: int(cid),

isolation: h.isolation,
isoState: h.isoState,
}, nil
}

Expand All @@ -818,12 +797,12 @@ type safeChunk struct {
s *memSeries
cid int

isolation *IsolationState
isoState *IsolationState
}

func (c *safeChunk) Iterator() chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, c.isolation)
it := c.s.iterator(c.cid, c.isoState)
c.s.Unlock()
return it
}
Expand Down Expand Up @@ -983,20 +962,6 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
return s, true
}

// readLowWatermark returns the writeId below which
// we no longer need to track which writes were from
// which writeId.
func (h *Head) readLowWatermark() uint64 {
h.writeMtx.Lock() // Take writeMtx first.
defer h.writeMtx.Unlock()
h.readMtx.Lock()
defer h.readMtx.Unlock()
if h.readsOpen.prev == h.readsOpen {
return h.lastWriteID
}
return h.readsOpen.prev.lowWaterMark
}

// seriesHashmap is a simple hashmap for memSeries by their label set. It is built
// on top of a regular hashmap and holds a slice of series to resolve hash collisions.
// Its methods require the hash to be submitted with it to avoid re-computations throughout
Expand Down Expand Up @@ -1185,10 +1150,7 @@ type memSeries struct {

app chunkenc.Appender // Current appender for the chunk.

// Write ids of most recent samples. This is a ring buffer.
writeIDs []uint64
writeIDFirst int // Position of first id in the ring.
writeIDCount int // How many ids in the ring.
txs *txRing
}

func (s *memSeries) minTime() int64 {
Expand Down Expand Up @@ -1225,7 +1187,7 @@ func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
writeIDs: make([]uint64, 4),
txs: newTxRing(4),
}
return s
}
Expand Down Expand Up @@ -1316,40 +1278,15 @@ func (s *memSeries) append(t int64, v float64, writeID uint64) (success, chunkCr
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}

if s.writeIDCount == len(s.writeIDs) {
// Ring buffer is full, expand by doubling.
newRing := make([]uint64, s.writeIDCount*2)
idx := copy(newRing[:], s.writeIDs[s.writeIDFirst%len(s.writeIDs):])
copy(newRing[idx:], s.writeIDs[:s.writeIDFirst%len(s.writeIDs)])
s.writeIDs = newRing
s.writeIDFirst = 0
}
s.writeIDs[(s.writeIDFirst+s.writeIDCount)%len(s.writeIDs)] = writeID
s.writeIDCount++
s.txs.add(writeID)

return true, chunkCreated
}

// cleanupWriteIDsBelow cleans up older writeIds. Has to be called after acquiring
// lock.
func (s *memSeries) cleanupWriteIDsBelow(bound uint64) {
pos := s.writeIDFirst

for s.writeIDCount > 0 {
if s.writeIDs[pos] < bound {
s.writeIDFirst++
s.writeIDCount--
} else {
break
}
pos++
if pos == len(s.writeIDs) {
pos = 0
}
}
if s.writeIDFirst >= len(s.writeIDs) {
s.writeIDFirst -= len(s.writeIDs)
}
s.txs.cleanupWriteIDsBelow(bound)
}

func (s *memSeries) cleanupExtraWriteIds() {
Expand All @@ -1358,28 +1295,7 @@ func (s *memSeries) cleanupExtraWriteIds() {
totalSamples += c.chunk.NumSamples()
}

if s.writeIDCount <= totalSamples {
return
}

s.writeIDFirst += (s.writeIDCount - totalSamples)
s.writeIDCount = totalSamples

newBufSize := len(s.writeIDs)
for totalSamples < newBufSize/2 {
newBufSize = newBufSize / 2
}

if newBufSize == len(s.writeIDs) {
return
}

newRing := make([]uint64, newBufSize)
idx := copy(newRing[:], s.writeIDs[s.writeIDFirst%len(s.writeIDs):])
copy(newRing[idx:], s.writeIDs[:s.writeIDFirst%len(s.writeIDs)])

s.writeIDs = newRing
s.writeIDFirst = 0
s.txs.cutoffN(totalSamples)
}

// computeChunkEndTime estimates the end timestamp based the beginning of a chunk,
Expand All @@ -1393,7 +1309,7 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/a
}

func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterator {
func (s *memSeries) iterator(id int, isoState *IsolationState) chunkenc.Iterator {
c := s.chunk(id)

// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
Expand All @@ -1408,7 +1324,7 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato
numSamples := c.chunk.NumSamples()
stopAfter := numSamples

if isolation != nil {
if isoState != nil {
totalSamples := 0
previousSamples := 0 // Samples before this chunk.
for j, d := range s.chunks {
Expand All @@ -1417,23 +1333,22 @@ func (s *memSeries) iterator(id int, isolation *IsolationState) chunkenc.Iterato
previousSamples += d.chunk.NumSamples()
}
}
writeIdsToConsider := (previousSamples + c.chunk.NumSamples()) - (totalSamples - s.writeIDCount)

writeIdsToConsider := (previousSamples + c.chunk.NumSamples()) - (totalSamples - s.txs.txIDCount)
// Iterate over the ring, find the first one that the isolation state says not
// to return.
pos := s.writeIDFirst
it := s.txs.iterator()
for index := 0; index < writeIdsToConsider; index++ {
writeID := s.writeIDs[pos]
if _, ok := isolation.incompleteWrites[writeID]; ok || writeID > isolation.maxWriteID {
stopAfter = index - (writeIdsToConsider - c.chunk.NumSamples())
writeID := it.At()
if _, ok := isoState.incompleteWrites[writeID]; ok || writeID > isoState.maxWriteID {
stopAfter = c.chunk.NumSamples() - (writeIdsToConsider - index)
if stopAfter < 0 {
stopAfter = 0 // Stopped in a previous chunk.
}
break
}
pos++
if pos == len(s.writeIDs) {
pos = 0
}

it.Next()
}
}

Expand Down
14 changes: 7 additions & 7 deletions head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func TestMemSeriesIsolation(t *testing.T) {
idx, err := hb.Index()
testutil.Ok(t, err)

iso := hb.IsolationState()
iso := hb.iso.State()
iso.maxWriteID = maxWriteId

querier := &blockQuerier{
Expand Down Expand Up @@ -860,12 +860,12 @@ func TestHead_Truncate_WriteIDs(t *testing.T) {
{minTime: 1000, maxTime: 1999, chunk: chk},
}

s1.writeIDs = []uint64{2, 3, 4, 5, 0, 0, 0, 1}
s1.writeIDFirst = 7
s1.writeIDCount = 5
s1.txs.txIDs = []uint64{2, 3, 4, 5, 0, 0, 0, 1}
s1.txs.txIDFirst = 7
s1.txs.txIDCount = 5

testutil.Ok(t, h.Truncate(1000))
testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.writeIDs)
testutil.Equals(t, 0, s1.writeIDFirst)
testutil.Equals(t, 3, s1.writeIDCount)
testutil.Equals(t, []uint64{3, 4, 5, 0}, s1.txs.txIDs)
testutil.Equals(t, 0, s1.txs.txIDFirst)
testutil.Equals(t, 3, s1.txs.txIDCount)
}
Loading

0 comments on commit 401f50d

Please sign in to comment.