From 0a27df92f091a7d7eb6087892f307c72e70b2f0d Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Date: Wed, 5 Feb 2020 19:09:40 +0530 Subject: [PATCH] Refactor tsdb/chunks/chunks.go for future PRs (#6754) Signed-off-by: Ganesh Vernekar --- tsdb/chunks/chunks.go | 65 ++++++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 1a11924d1da..9b332f9710e 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -190,40 +190,50 @@ func (w *Writer) cut() error { return err } - p, _, err := nextSequenceFile(w.dirFile.Name()) + n, f, _, err := cutSegmentFile(w.dirFile, chunksFormatV1, w.segmentSize) if err != nil { return err } + w.n = int64(n) + + w.files = append(w.files, f) + if w.wbuf != nil { + w.wbuf.Reset(f) + } else { + w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) + } + + return nil +} + +func cutSegmentFile(dirFile *os.File, chunksFormat byte, segmentSize int64) (headerSize int, newFile *os.File, seq int, err error) { + p, seq, err := nextSequenceFile(dirFile.Name()) + if err != nil { + return 0, nil, 0, err + } f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666) if err != nil { - return err + return 0, nil, 0, err } - if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { - return err + if segmentSize > 0 { + if err = fileutil.Preallocate(f, segmentSize, true); err != nil { + return 0, nil, 0, err + } } - if err = w.dirFile.Sync(); err != nil { - return err + if err = dirFile.Sync(); err != nil { + return 0, nil, 0, err } // Write header metadata for new file. metab := make([]byte, SegmentHeaderSize) binary.BigEndian.PutUint32(metab[:MagicChunksSize], MagicChunks) - metab[4] = chunksFormatV1 + metab[4] = chunksFormat n, err := f.Write(metab) if err != nil { - return err + return 0, nil, 0, err } - w.n = int64(n) - - w.files = append(w.files, f) - if w.wbuf != nil { - w.wbuf.Reset(f) - } else { - w.wbuf = bufio.NewWriterSize(f, 8*1024*1024) - } - - return nil + return n, f, seq, nil } func (w *Writer) write(b []byte) error { @@ -464,8 +474,6 @@ type Reader struct { func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} - var totalSize int64 - for i, b := range cr.bs { if b.Len() < SegmentHeaderSize { return nil, errors.Wrapf(errInvalidSize, "invalid segment header in segment %d", i) @@ -479,9 +487,8 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if v := int(b.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { return nil, errors.Errorf("invalid chunk format version %d", v) } - totalSize += int64(b.Len()) + cr.size += int64(b.Len()) } - cr.size = totalSize return &cr, nil } @@ -594,9 +601,18 @@ func nextSequenceFile(dir string) (string, int, error) { if err != nil { continue } - i = j + // It is not necessary that we find the files in number order, + // for example with '1000000' and '200000', '1000000' would come first. + // Though this is a very very race case, we check anyway for the max id. + if j > i { + i = j + } } - return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil + return segmentFile(dir, int(i+1)), int(i + 1), nil +} + +func segmentFile(baseDir string, index int) string { + return filepath.Join(baseDir, fmt.Sprintf("%0.6d", index)) } func sequenceFiles(dir string) ([]string, error) { @@ -605,7 +621,6 @@ func sequenceFiles(dir string) ([]string, error) { return nil, err } var res []string - for _, fi := range files { if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil { continue