Skip to content

Commit

Permalink
feat: remove mutexes from wal.SegmentWriter (#13641)
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana authored Jul 24, 2024
1 parent 8cb19a2 commit 7ed63ea
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 186 deletions.
40 changes: 12 additions & 28 deletions pkg/storage/wal/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ var (
streamSegmentPool = sync.Pool{
New: func() interface{} {
return &streamSegment{
lock: &sync.Mutex{},
entries: make([]*logproto.Entry, 0, 4096),
}
},
Expand All @@ -47,18 +46,16 @@ type streamID struct {
}

type SegmentWriter struct {
metrics *SegmentMetrics
streams map[streamID]*streamSegment
buf1 encoding.Encbuf
outputSize atomic.Int64
inputSize atomic.Int64
idxWriter *index.Writer
consistencyMtx *sync.RWMutex
indexRef metastorepb.DataRef
metrics *SegmentMetrics
streams map[streamID]*streamSegment
buf1 encoding.Encbuf
outputSize atomic.Int64
inputSize atomic.Int64
idxWriter *index.Writer
indexRef metastorepb.DataRef
}

type streamSegment struct {
lock *sync.Mutex
lbls labels.Labels
entries []*logproto.Entry
tenantID string
Expand Down Expand Up @@ -86,24 +83,19 @@ func NewWalSegmentWriter(m *SegmentMetrics) (*SegmentWriter, error) {
return nil, err
}
return &SegmentWriter{
metrics: m,
streams: make(map[streamID]*streamSegment, 64),
buf1: encoding.EncWith(make([]byte, 0, 4)),
idxWriter: idxWriter,
inputSize: atomic.Int64{},
consistencyMtx: &sync.RWMutex{},
metrics: m,
streams: make(map[streamID]*streamSegment, 64),
buf1: encoding.EncWith(make([]byte, 0, 4)),
idxWriter: idxWriter,
inputSize: atomic.Int64{},
}, nil
}

func (b *SegmentWriter) getOrCreateStream(id streamID, lbls labels.Labels) *streamSegment {
b.consistencyMtx.RLock()
s, ok := b.streams[id]
b.consistencyMtx.RUnlock()
if ok {
return s
}
b.consistencyMtx.Lock()
defer b.consistencyMtx.Unlock()
// Check another thread has not created it
s, ok = b.streams[id]
if ok {
Expand All @@ -130,8 +122,6 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
id := streamID{labels: labelsString, tenant: tenantID}
s := b.getOrCreateStream(id, lbls)

s.lock.Lock()
defer s.lock.Unlock()
for i, e := range entries {
if e.Timestamp.UnixNano() >= s.maxt {
s.entries = append(s.entries, entries[i])
Expand All @@ -152,9 +142,6 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels
// ReportMetrics for the writer. If called before WriteTo then the output size
// histogram will observe 0.
func (b *SegmentWriter) ReportMetrics() {
b.consistencyMtx.Lock()
defer b.consistencyMtx.Unlock()

b.metrics.streams.Observe(float64(len(b.streams)))
tenants := make(map[string]struct{}, 64)
for _, s := range b.streams {
Expand All @@ -166,9 +153,6 @@ func (b *SegmentWriter) ReportMetrics() {
}

func (b *SegmentWriter) Meta(id string) *metastorepb.BlockMeta {
b.consistencyMtx.Lock()
defer b.consistencyMtx.Unlock()

var globalMinT, globalMaxT int64

tenants := make(map[string]*metastorepb.TenantStreams, 64)
Expand Down
158 changes: 0 additions & 158 deletions pkg/storage/wal/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -132,163 +131,6 @@ func TestWalSegmentWriter_Append(t *testing.T) {
}
}

func BenchmarkConcurrentAppends(t *testing.B) {
type appendArgs struct {
tenant string
labels labels.Labels
entries []*push.Entry
}

lbls := []labels.Labels{
labels.FromStrings("container", "foo", "namespace", "dev"),
labels.FromStrings("container", "bar", "namespace", "staging"),
labels.FromStrings("container", "bar", "namespace", "prod"),
}
characters := "abcdefghijklmnopqrstuvwxyz"
tenants := []string{}
// 676 unique tenants (26^2)
for i := 0; i < len(characters); i++ {
for j := 0; j < len(characters); j++ {
tenants = append(tenants, string(characters[i])+string(characters[j]))
}
}

workChan := make(chan *appendArgs)
var wg sync.WaitGroup
var w *SegmentWriter
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
for args := range workChan {
w.Append(args.tenant, args.labels.String(), args.labels, args.entries)
}
wg.Done()
}(i)
}

t.ResetTimer()
for i := 0; i < t.N; i++ {
var err error
w, err = NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)

for _, lbl := range lbls {
for _, r := range tenants {
for i := 0; i < 10; i++ {
workChan <- &appendArgs{
tenant: r,
labels: lbl,
entries: []*push.Entry{
{Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf("log line %d", i)},
},
}
}
}
}
}
close(workChan)
wg.Wait()
}

func TestConcurrentAppends(t *testing.T) {
type appendArgs struct {
tenant string
labels labels.Labels
entries []*push.Entry
}
dst := bytes.NewBuffer(nil)

w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)
var wg sync.WaitGroup
workChan := make(chan *appendArgs, 100)
for i := 0; i < 100; i++ {
wg.Add(1)
go func(i int) {
for args := range workChan {
w.Append(args.tenant, args.labels.String(), args.labels, args.entries)
}
wg.Done()
}(i)
}

lbls := []labels.Labels{
labels.FromStrings("container", "foo", "namespace", "dev"),
labels.FromStrings("container", "bar", "namespace", "staging"),
labels.FromStrings("container", "bar", "namespace", "prod"),
}
characters := "abcdefghijklmnopqrstuvwxyz"
tenants := []string{}
// 676 unique tenants (26^2)
for i := 0; i < len(characters); i++ {
for j := 0; j < len(characters); j++ {
for k := 0; k < len(characters); k++ {
tenants = append(tenants, string(characters[i])+string(characters[j])+string(characters[k]))
}
}
}

msgsPerSeries := 10
msgsGenerated := 0
for _, r := range tenants {
for _, lbl := range lbls {
for i := 0; i < msgsPerSeries; i++ {
msgsGenerated++
workChan <- &appendArgs{
tenant: r,
labels: lbl,
entries: []*push.Entry{
{Timestamp: time.Unix(0, int64(i)), Line: fmt.Sprintf("log line %d", i)},
},
}
}
}
}
close(workChan)
wg.Wait()

n, err := w.WriteTo(dst)
require.NoError(t, err)
require.True(t, n > 0)

r, err := NewReader(dst.Bytes())
require.NoError(t, err)

iter, err := r.Series(context.Background())
require.NoError(t, err)

var expectedSeries, actualSeries []string

for _, tenant := range tenants {
for _, lbl := range lbls {
expectedSeries = append(expectedSeries, labels.NewBuilder(lbl).Set(tenantLabel, tenant).Labels().String())
}
}

msgsRead := 0
for iter.Next() {
actualSeries = append(actualSeries, iter.At().String())
chk, err := iter.ChunkReader(nil)
require.NoError(t, err)
// verify all lines
var i int
for chk.Next() {
ts, line := chk.At()
require.Equal(t, int64(i), ts)
require.Equal(t, fmt.Sprintf("log line %d", i), string(line))
msgsRead++
i++
}
require.NoError(t, chk.Err())
require.NoError(t, chk.Close())
require.Equal(t, msgsPerSeries, i)
}
require.NoError(t, iter.Err())
require.ElementsMatch(t, expectedSeries, actualSeries)
require.Equal(t, msgsGenerated, msgsRead)
t.Logf("Generated %d messages between %d tenants", msgsGenerated, len(tenants))
}

func TestMultiTenantWrite(t *testing.T) {
w, err := NewWalSegmentWriter(NewSegmentMetrics(nil))
require.NoError(t, err)
Expand Down

0 comments on commit 7ed63ea

Please sign in to comment.