Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Avoid chunk allocations and refactor compactions #118

Merged
merged 3 commits into from
Aug 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels"
)

Expand Down Expand Up @@ -112,7 +113,7 @@ type BlockStats struct {
type BlockMetaCompaction struct {
// Maximum number of compaction cycles any source block has
// gone through.
Generation int `json:"generation"`
Level int `json:"level"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would this affect old data? The Level for the older ones would be 0 and I don't think any of it will be compacted and we could load the wrong heads.

Which is okay as we moved to "block-ranges" anyways, but I think this should be noted when we release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are beta... it's just a breaking change .__.

// ULIDs of all source head blocks that went into the block.
Sources []ulid.ULID `json:"sources,omitempty"`
}
Expand Down Expand Up @@ -181,13 +182,13 @@ type persistedBlock struct {
tombstones tombstoneReader
}

func newPersistedBlock(dir string) (*persistedBlock, error) {
func newPersistedBlock(dir string, pool chunks.Pool) (*persistedBlock, error) {
meta, err := readMetaFile(dir)
if err != nil {
return nil, err
}

cr, err := newChunkReader(chunkDir(dir))
cr, err := newChunkReader(chunkDir(dir), pool)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -252,7 +253,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
stones := map[uint32]intervals{}

var lset labels.Labels
var chks []*ChunkMeta
var chks []ChunkMeta

Outer:
for p.Next() {
Expand Down
41 changes: 20 additions & 21 deletions chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type ChunkWriter interface {
// must be populated.
// After returning successfully, the Ref fields in the ChunkMetas
// are set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...*ChunkMeta) error
WriteChunks(chunks ...ChunkMeta) error

// Close writes any required finalization and closes the resources
// associated with the underlying writer.
Expand Down Expand Up @@ -222,7 +222,7 @@ func (w *chunkWriter) write(b []byte) error {
return err
}

func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
Expand All @@ -238,23 +238,22 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
}
}

b := make([]byte, binary.MaxVarintLen32)
n := binary.PutUvarint(b, uint64(len(chks)))

if err := w.write(b[:n]); err != nil {
return err
}
seq := uint64(w.seq()) << 32
var (
b = [binary.MaxVarintLen32]byte{}
seq = uint64(w.seq()) << 32
)
for i := range chks {
chk := &chks[i]

for _, chk := range chks {
chk.Ref = seq | uint64(w.n)

n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious how does b[:] affect it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We turned b into an array instead of a slice, causing it to be allocated on the stack. Escape analysis by the compiler should've done this before too I thought. But it didn't catch it apparently, so I made it explicit.

Anyway, PutUvarint wants a slice, not an array, and [:] does exactly that.


if err := w.write(b[:n]); err != nil {
return err
}
if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
b[0] = byte(chk.Chunk.Encoding())
if err := w.write(b[:1]); err != nil {
return err
}
if err := w.write(chk.Chunk.Bytes()); err != nil {
Expand All @@ -265,7 +264,7 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
if err := chk.writeHash(w.crc32); err != nil {
return err
}
if err := w.write(w.crc32.Sum(nil)); err != nil {
if err := w.write(w.crc32.Sum(b[:0])); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious again, how does this affect it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

return err
}
}
Expand Down Expand Up @@ -298,15 +297,20 @@ type chunkReader struct {

// Closers for resources behind the byte slices.
cs []io.Closer

pool chunks.Pool
}

// newChunkReader returns a new chunkReader based on mmaped files found in dir.
func newChunkReader(dir string) (*chunkReader, error) {
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
files, err := sequenceFiles(dir, "")
if err != nil {
return nil, err
}
var cr chunkReader
if pool == nil {
pool = chunks.NewPool()
}
cr := chunkReader{pool: pool}

for _, fn := range files {
f, err := openMmapFile(fn)
Expand Down Expand Up @@ -353,11 +357,6 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
return nil, fmt.Errorf("reading chunk length failed")
}
b = b[n:]
enc := chunks.Encoding(b[0])

c, err := chunks.FromData(enc, b[1:1+l])
if err != nil {
return nil, err
}
return c, nil
return s.pool.Get(chunks.Encoding(b[0]), b[1:1+l])
}
57 changes: 56 additions & 1 deletion chunks/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@

package chunks

import "fmt"
import (
"fmt"
"sync"

"github.com/pkg/errors"
)

// Encoding is the identifier for a chunk encoding.
type Encoding uint8
Expand Down Expand Up @@ -63,3 +68,53 @@ type Iterator interface {
Err() error
Next() bool
}

type Pool interface {
Put(Chunk) error
Get(e Encoding, b []byte) (Chunk, error)
}

// Pool is a memory pool of chunk objects.
type pool struct {
xor sync.Pool
}

func NewPool() Pool {
return &pool{
xor: sync.Pool{
New: func() interface{} {
return &XORChunk{b: &bstream{}}
},
},
}
}

func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
switch e {
case EncXOR:
c := p.xor.Get().(*XORChunk)
c.b.stream = b
c.b.count = 0
return c, nil
}
return nil, errors.Errorf("invalid encoding %q", e)
}

func (p *pool) Put(c Chunk) error {
switch c.Encoding() {
case EncXOR:
xc, ok := c.(*XORChunk)
// This may happen often with wrapped chunks. Nothing we can really do about
// it but returning an error would cause a lot of allocations again. Thus,
// we just skip it.
if !ok {
return nil
}
xc.b.stream = nil
xc.b.count = 0
p.xor.Put(c)
default:
return errors.Errorf("invalid encoding %q", c.Encoding())
}
return nil
}
Loading