Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wal): Add sizing information to writer and reader. #13267

Merged
merged 31 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0e5aa15
wip
cyriltovena May 16, 2024
2ef5c3c
wip
cyriltovena May 16, 2024
d68a08d
wip
cyriltovena May 28, 2024
144bb9c
add some doc and vision
cyriltovena May 30, 2024
5f8cf08
move compressed len to chunk
cyriltovena May 30, 2024
f32c755
work on the chunk encoding
cyriltovena May 31, 2024
19bbd76
missing changes
cyriltovena May 31, 2024
9e1d5b1
working on fixes and tests
cyriltovena May 31, 2024
c8b792f
add more tests and found a bug with dod
cyriltovena Jun 2, 2024
749acf7
fix(wal): Use varint encoding for ts_2_dod in WAL format
cyriltovena Jun 3, 2024
7590f55
refactor(wal): Remove unnecessary code in writeChunk function
cyriltovena Jun 3, 2024
7991408
chore: Refactor ChunkReader to improve performance and memory usage
cyriltovena Jun 3, 2024
38fcad4
chore: Add more realistic tests and benchmarks
cyriltovena Jun 3, 2024
bdf389f
refactor: Update index writer to support in memory buffer.
cyriltovena Jun 5, 2024
296daee
pausing work I need a new index different than the current work
cyriltovena Jun 7, 2024
d1cfcae
Add a special in memory index for the wal package
cyriltovena Jun 10, 2024
37ea6d6
Finalize writing and start reading index
cyriltovena Jun 10, 2024
d649646
Add offset/start to chunk ref
cyriltovena Jun 10, 2024
fd1dbd8
wip
cyriltovena Jun 13, 2024
b49d2ba
refactor(wal): Implement SeriesIter.
cyriltovena Jun 16, 2024
f575efb
fix(wal): Fixes snappy block offsets counting.
cyriltovena Jun 17, 2024
071ee04
chore: update format doc to reflect latest changes
cyriltovena Jun 18, 2024
6227361
chore: lint
cyriltovena Jun 18, 2024
f625252
refactor: Removes changes not required.
cyriltovena Jun 18, 2024
32b1d2c
chore: format
cyriltovena Jun 18, 2024
d3f179e
feat(wal): Add sizing information to writer and reader.
cyriltovena Jun 19, 2024
57ad53b
Merge remote-tracking branch 'upstream/main' into wal-sizing
cyriltovena Jun 19, 2024
d7dc2b1
Merge remote-tracking branch 'upstream/main' into wal-sizing
cyriltovena Jun 19, 2024
95c1015
lint
cyriltovena Jun 20, 2024
b6ba673
ensure stable test
cyriltovena Jun 20, 2024
57d912b
Merge branch 'main' into wal-sizing
cyriltovena Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/pattern/iter/batch_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package iter

import (
"sort"
"testing"

"github.com/go-kit/log"
Expand Down Expand Up @@ -211,6 +212,12 @@ func TestReadMetricsBatch(t *testing.T) {
it := NewSumMergeSampleIterator(tt.seriesIter)
got, err := ReadMetricsBatch(it, tt.batchSize, log.NewNopLogger())
require.NoError(t, err)
sort.Slice(tt.expected.Series, func(i, j int) bool {
return tt.expected.Series[i].Labels < tt.expected.Series[j].Labels
})
sort.Slice(got.Series, func(i, j int) bool {
return got.Series[i].Labels < got.Series[j].Labels
})
require.Equal(t, tt.expected.Series, got.Series)
})
}
Expand Down
70 changes: 7 additions & 63 deletions pkg/storage/wal/chunks/chunks_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package chunks

import (
"bufio"
"bytes"
"fmt"
"os"
"path/filepath"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"
)

func TestChunkReaderWriter(t *testing.T) {
Expand Down Expand Up @@ -121,11 +119,11 @@ func TestChunkReaderWriter(t *testing.T) {
}

func TestChunkReaderWriterWithLogGenerator(t *testing.T) {
filenames := testDataFile()
filenames := testdata.Files()

for _, filename := range filenames {
t.Run(filename, func(t *testing.T) {
gen := newLogGenerator(t, filename)
gen := testdata.NewLogGenerator(t, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -196,10 +194,10 @@ var (

// Benchmark reads with log generator
func BenchmarkReadChunkWithLogGenerator(b *testing.B) {
filenames := testDataFile()
filenames := testdata.Files()
for _, filename := range filenames {
b.Run(filename, func(b *testing.B) {
gen := newLogGenerator(b, filename)
gen := testdata.NewLogGenerator(b, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -239,12 +237,12 @@ func BenchmarkReadChunkWithLogGenerator(b *testing.B) {

// Benchmark with log generator
func BenchmarkWriteChunkWithLogGenerator(b *testing.B) {
filenames := testDataFile()
filenames := testdata.Files()

for _, filename := range filenames {
for _, count := range []int{1000, 10000, 100000} {
b.Run(fmt.Sprintf("%s-%d", filename, count), func(b *testing.B) {
gen := newLogGenerator(b, filename)
gen := testdata.NewLogGenerator(b, filename)
defer gen.Close()

var entries []*logproto.Entry
Expand Down Expand Up @@ -278,24 +276,6 @@ func BenchmarkWriteChunkWithLogGenerator(b *testing.B) {
}
}

func testDataFile() []string {
testdataDir := "../testdata"
files, err := os.ReadDir(testdataDir)
if err != nil {
panic(err)
}

var fileNames []string
for _, file := range files {
if !file.IsDir() {
filePath := filepath.Join(testdataDir, file.Name())
fileNames = append(fileNames, filePath)
}
}

return fileNames
}

// generateLogEntries generates a slice of logproto.Entry with the given count.
func generateLogEntries(count int) []*logproto.Entry {
entries := make([]*logproto.Entry, count)
Expand All @@ -307,39 +287,3 @@ func generateLogEntries(count int) []*logproto.Entry {
}
return entries
}

type logGenerator struct {
f *os.File
s *bufio.Scanner
}

func (g *logGenerator) Next() (bool, []byte) {
if g.s.Scan() {
return true, g.s.Bytes()
}
g.reset()
return g.s.Scan(), g.s.Bytes()
}

func (g *logGenerator) Close() {
if g.f != nil {
g.f.Close()
}
g.f = nil
}

func (g *logGenerator) reset() {
_, _ = g.f.Seek(0, 0)
g.s = bufio.NewScanner(g.f)
}

func newLogGenerator(t testing.TB, filename string) *logGenerator {
t.Helper()
file, err := os.Open(filename)
require.NoError(t, err)

return &logGenerator{
f: file,
s: bufio.NewScanner(file),
}
}
35 changes: 33 additions & 2 deletions pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ type streamID struct {
}

type SegmentWriter struct {
streams *swiss.Map[streamID, *streamSegment]
buf1 encoding.Encbuf
streams *swiss.Map[streamID, *streamSegment]
buf1 encoding.Encbuf
inputSize int64
}

type streamSegment struct {
Expand All @@ -61,6 +62,9 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
if len(entries) == 0 {
return
}
for _, e := range entries {
b.inputSize += int64(len(e.Line))
}
id := streamID{labels: labelsString, tenant: tenantID}
s, ok := b.streams.Get(id)
if !ok {
Expand Down Expand Up @@ -224,6 +228,13 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) {
func (b *SegmentWriter) Reset() {
b.streams.Clear()
b.buf1.Reset()
b.inputSize = 0
}

// InputSize returns the total size of the input data written to the writer.
// It doesn't account for timestamps and labels.
func (b *SegmentWriter) InputSize() int64 {
return b.inputSize
}

type SegmentReader struct {
Expand Down Expand Up @@ -332,3 +343,23 @@ func (r *SegmentReader) Series(ctx context.Context) (*SeriesIter, error) {

return NewSeriesIter(r.idr, ps, r.b), nil
}

type Sizes struct {
Index int64
Series []int64
}

func (r *SegmentReader) Sizes() (Sizes, error) {
var sizes Sizes
sizes.Index = r.idr.Size()
it, err := r.Series(context.Background())
if err != nil {
return sizes, err
}
sizes.Series = []int64{}
for it.Next() {
_, size := it.chunksMeta[0].Ref.Unpack()
sizes.Series = append(sizes.Series, int64(size))
}
return sizes, err
}
67 changes: 67 additions & 0 deletions pkg/storage/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (
"testing"
"time"

"github.com/dustin/go-humanize"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/wal/testdata"

"github.com/grafana/loki/pkg/push"
)
Expand Down Expand Up @@ -186,3 +188,68 @@ func TestMultiTenantWrite(t *testing.T) {
require.NoError(t, iter.Err())
require.ElementsMatch(t, expectedSeries, actualSeries)
}

func TestCompression(t *testing.T) {
size := []int64{250 * 1024, 500 * 1024, 750 * 1024, 1 << 20, 2 << 20, 5 << 20, 10 << 20, 20 << 20, 50 << 20, 100 << 20}
for _, s := range size {
t.Run(fmt.Sprintf("size %.2f", float64(s)/(1024*1024)), func(t *testing.T) {
testCompression(t, s)
})
}
}

func testCompression(t *testing.T, maxInputSize int64) {
w := NewWalSegmentWriter()
dst := bytes.NewBuffer(nil)
files := testdata.Files()
lbls := []labels.Labels{}
generators := []*testdata.LogGenerator{}

for _, file := range files {
lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "dev"))
lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "prod"))
g := testdata.NewLogGenerator(t, file)
generators = append(generators, g, g)
}
inputSize := int64(0)
for inputSize < maxInputSize {
for i, lbl := range lbls {
more, line := generators[i].Next()
if !more {
continue
}
inputSize += int64(len(line))
w.Append("tenant", lbl.String(), lbl, []*push.Entry{
{Timestamp: time.Unix(0, int64(i*1e9)), Line: string(line)},
})
}
}

require.Equal(t, inputSize, w.InputSize())

now := time.Now()
n, err := w.WriteTo(dst)
require.NoError(t, err)
require.True(t, n > 0)
compressionTime := time.Since(now)

r, err := NewReader(dst.Bytes())
require.NoError(t, err)
inputSizeMB := float64(w.InputSize()) / (1024 * 1024)
outputSizeMB := float64(dst.Len()) / (1024 * 1024)
compressionRatio := (1 - (outputSizeMB / inputSizeMB)) * 100

t.Logf("Input Size: %s\n", humanize.Bytes(uint64(w.InputSize())))
t.Logf("Output Size: %s\n", humanize.Bytes(uint64(dst.Len())))
t.Logf("Compression Ratio: %.2f%%\n", compressionRatio)
t.Logf("Write time: %s\n", compressionTime)
sizes, err := r.Sizes()
require.NoError(t, err)
t.Logf("Total chunks %d\n", len(sizes.Series))
t.Logf("Index size %s\n", humanize.Bytes(uint64(sizes.Index)))
sizesString := ""
for _, size := range sizes.Series {
sizesString += humanize.Bytes(uint64(size)) + ", "
}
t.Logf("Series sizes: [%s]\n", sizesString)
}
71 changes: 71 additions & 0 deletions pkg/storage/wal/testdata/generator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package testdata

import (
"bufio"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/require"
)

type LogGenerator struct {
f *os.File
s *bufio.Scanner
}

func (g *LogGenerator) Next() (bool, []byte) {
if g.s.Scan() {
return true, g.s.Bytes()
}
g.reset()
return g.s.Scan(), g.s.Bytes()
}

func (g *LogGenerator) Close() {
if g.f != nil {
g.f.Close()
}
g.f = nil
}

func (g *LogGenerator) reset() {
_, _ = g.f.Seek(0, 0)
g.s = bufio.NewScanner(g.f)
}

func NewLogGenerator(t testing.TB, filename string) *LogGenerator {
t.Helper()
file, err := os.Open(filename)
require.NoError(t, err)

return &LogGenerator{
f: file,
s: bufio.NewScanner(file),
}
}

func Files() []string {
testdataDir := "./testdata"
files, err := os.ReadDir(testdataDir)
if err != nil && !os.IsNotExist(err) {
if !os.IsNotExist(err) {
panic(err)
}
testdataDir = "../testdata"
files, err = os.ReadDir(testdataDir)
if err != nil {
panic(err)
}
}

var fileNames []string
for _, file := range files {
if !file.IsDir() {
filePath := filepath.Join(testdataDir, file.Name())
fileNames = append(fileNames, filePath)
}
}

return fileNames
}
Loading