Skip to content

Commit

Permalink
Refactor tsdb/chunks/chunks.go for future PRs (prometheus#6754)
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome authored Feb 5, 2020
1 parent 56bf0ee commit 0a27df9
Showing 1 changed file with 40 additions and 25 deletions.
65 changes: 40 additions & 25 deletions tsdb/chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,40 +190,50 @@ func (w *Writer) cut() error {
return err
}

p, _, err := nextSequenceFile(w.dirFile.Name())
n, f, _, err := cutSegmentFile(w.dirFile, chunksFormatV1, w.segmentSize)
if err != nil {
return err
}
w.n = int64(n)

w.files = append(w.files, f)
if w.wbuf != nil {
w.wbuf.Reset(f)
} else {
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
}

return nil
}

func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (headerSize int, newFile *os.File, seq int, err error) {
p, seq, err := nextSequenceFile(dirFile.Name())
if err != nil {
return 0, nil, 0, err
}
f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
return 0, nil, 0, err
}
if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
return err
if segmentSize > 0 {
if err = fileutil.Preallocate(f, segmentSize, true); err != nil {
return 0, nil, 0, err
}
}
if err = w.dirFile.Sync(); err != nil {
return err
if err = dirFile.Sync(); err != nil {
return 0, nil, 0, err
}

// Write header metadata for new file.
metab := make([]byte, SegmentHeaderSize)
binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks)
metab[4] = chunksFormatV1
metab[4] = chunksFormat

n, err := f.Write(metab)
if err != nil {
return err
return 0, nil, 0, err
}
w.n = int64(n)

w.files = append(w.files, f)
if w.wbuf != nil {
w.wbuf.Reset(f)
} else {
w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
}

return nil
return n, f, seq, nil
}

func (w *Writer) write(b []byte) error {
Expand Down Expand Up @@ -464,8 +474,6 @@ type Reader struct {

func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := Reader{pool: pool, bs: bs, cs: cs}
var totalSize int64

for i, b := range cr.bs {
if b.Len() < SegmentHeaderSize {
return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i)
Expand All @@ -479,9 +487,8 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err
if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
return nil, errors.Errorf("invalid chunk format version %d", v)
}
totalSize += int64(b.Len())
cr.size += int64(b.Len())
}
cr.size = totalSize
return &cr, nil
}

Expand Down Expand Up @@ -594,9 +601,18 @@ func nextSequenceFile(dir string) (string, int, error) {
if err != nil {
continue
}
i = j
// It is not necessary that we find the files in number order,
// for example with '1000000' and '200000', '1000000' would come first.
// Though this is a very very race case, we check anyway for the max id.
if j > i {
i = j
}
}
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
return segmentFile(dir, int(i+1)), int(i + 1), nil
}

func segmentFile(baseDir string, index int) string {
return filepath.Join(baseDir, fmt.Sprintf("%0.6d", index))
}

func sequenceFiles(dir string) ([]string, error) {
Expand All @@ -605,7 +621,6 @@ func sequenceFiles(dir string) ([]string, error) {
return nil, err
}
var res []string

for _, fi := range files {
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue
Expand Down

0 comments on commit 0a27df9

Please sign in to comment.