Skip to content

Commit

Permalink
Improve test coverage of WAL Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
grobinson-grafana committed Jul 11, 2024
1 parent 583f7f3 commit 5ab5b65
Showing 1 changed file with 167 additions and 84 deletions.
251 changes: 167 additions & 84 deletions pkg/storage/wal/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,15 @@ import (

func TestManager_Append(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// Append some data.
lbs := labels.Labels{{
Name: "foo",
Value: "bar",
}}
entries := []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("a", 1024),
}}
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
Expand All @@ -46,28 +41,30 @@ func TestManager_Append(t *testing.T) {
default:
}

// Flush the data and broadcast that the flush is successful.
it := m.NextPending()
require.NotNil(t, it)
it.Result.SetDone(nil)
// Pretend to flush the data.
res.SetDone(nil)

// Should be able to read from the Done() as it is closed.
// Should be able to read from Done() as it is closed.
select {
case <-res.Done():
default:
t.Fatal("expected closed Done()")
}
require.NoError(t, res.Err())
}

// Return the segment to be written to again.
m.Put(it)
func TestManager_AppendFailed(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// Append some more data.
entries = []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("b", 1024),
}}
res, err = m.Append(AppendRequest{
// Append some data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Expand All @@ -76,12 +73,11 @@ func TestManager_Append(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, res)

// Flush the data, but this time broadcast an error that the flush failed.
it = m.NextPending()
require.NotNil(t, it)
it.Result.SetDone(errors.New("failed to flush"))
// Pretend that the flush failed.
res.SetDone(errors.New("failed to flush"))

// Should be able to read from the Done() as it is closed.
// Should be able to read from the Done() as it is closed and assert
// that the error is the expected error.
select {
case <-res.Done():
default:
Expand All @@ -90,25 +86,103 @@ func TestManager_Append(t *testing.T) {
require.EqualError(t, res.Err(), "failed to flush")
}

// This test asserts that Append operations return ErrFull if all segments
// are full and waiting to be flushed.
func TestManager_Append_ErrFull(t *testing.T) {
func TestManager_AppendMaxAge(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 8 * 1024 * 1024, // 8MB
}, NewMetrics(nil))
require.NoError(t, err)

// Append 1B of data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// The segment that was just appended to has neither reached the maximum
// age nor maximum size to be flushed.
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// Wait 100ms and append some more data.
time.Sleep(100 * time.Millisecond)
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}}
res, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// The segment has reached the maximum age and should have been moved to
// pending list to be flushed.
require.Equal(t, 0, m.available.Len())
require.Equal(t, 1, m.pending.Len())
}

func TestManager_AppendMaxSize(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// Append 512B of data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// The segment that was just appended to has neither reached the maximum
// age nor maximum size to be flushed.
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// Append another 512B of data.
entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}}
res, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
require.NotNil(t, res)

// The segment has reached the maximum size and should have been moved to
// pending list to be flushed.
require.Equal(t, 0, m.available.Len())
require.Equal(t, 1, m.pending.Len())
}

func TestManager_AppendWALFull(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 30 * time.Second,
MaxSegments: 10,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// Should be able to write to all 10 segments of 1KB each.
lbs := labels.Labels{{
Name: "foo",
Value: "bar",
}}
// Should be able to write 100KB of data, 10KB per segment.
lbs := labels.Labels{{Name: "a", Value: "b"}}
for i := 0; i < 10; i++ {
entries := []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("a", 1024),
}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
Expand All @@ -119,12 +193,9 @@ func TestManager_Append_ErrFull(t *testing.T) {
require.NotNil(t, res)
}

// Append more data should fail as all segments are full and waiting to be
// flushed.
entries := []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("b", 1024),
}}
// However, appending more data should fail as all segments are full and
// waiting to be flushed.
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
Expand All @@ -137,76 +208,88 @@ func TestManager_Append_ErrFull(t *testing.T) {

func TestManager_NextPending(t *testing.T) {
m, err := NewManager(Config{
MaxAge: DefaultMaxAge,
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// There should be no items as no data has been written.
// There should be no segments waiting to be flushed as no data has been
// written.
it := m.NextPending()
require.Nil(t, it)

// Append 512B of data. There should still be no items to as the segment is
// not full (1KB).
lbs := labels.Labels{{
Name: "foo",
Value: "bar",
}}
entries := []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("b", 512),
}}
// Append 1KB of data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}}
_, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)

// There should be a segment waiting to be flushed.
it = m.NextPending()
require.NotNil(t, it)

// There should be no more segments waiting to be flushed.
it = m.NextPending()
require.Nil(t, it)
}

// Write another 512B of data. There should be an item waiting to be flushed.
entries = []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("b", 512),
}}
_, err = m.Append(AppendRequest{
func TestManager_NexPendingMaxAge(t *testing.T) {
m, err := NewManager(Config{
MaxAge: 100 * time.Millisecond,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// Append 1B of data.
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}}
res, err := m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
LabelsStr: lbs.String(),
Entries: entries,
})
require.NoError(t, err)
it = m.NextPending()
require.NotNil(t, it)
require.NotNil(t, res)

// Should not get the same item more than once.
it = m.NextPending()
// The segment that was just appended to has neither reached the maximum
// age nor maximum size to be flushed.
it := m.NextPending()
require.Nil(t, it)
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// Wait 100ms. The segment that was just appended to should have reached
// the maximum age.
time.Sleep(100 * time.Millisecond)
it = m.NextPending()
require.NotNil(t, it)
require.Equal(t, 0, m.available.Len())
require.Equal(t, 0, m.pending.Len())
}

func TestManager_Put(t *testing.T) {
m, err := NewManager(Config{
MaxSegments: 10,
MaxAge: 30 * time.Second,
MaxSegments: 1,
MaxSegmentSize: 1024, // 1KB
}, NewMetrics(nil))
require.NoError(t, err)

// There should be 10 available segments, and 0 pending.
require.Equal(t, 10, m.available.Len())
// There should be 10 available and 0 pending segments.
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// Append 1KB of data.
lbs := labels.Labels{{
Name: "foo",
Value: "bar",
}}
entries := []*logproto.Entry{{
Timestamp: time.Now(),
Line: strings.Repeat("b", 1024),
}}
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}}
_, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
Expand All @@ -215,23 +298,23 @@ func TestManager_Put(t *testing.T) {
})
require.NoError(t, err)

// 1 segment is full, so there should now be 9 available segments,
// and 1 pending segment.
require.Equal(t, 9, m.available.Len())
// The segment is full, so there should now be 0 available segments and 1
// pending segment.
require.Equal(t, 0, m.available.Len())
require.Equal(t, 1, m.pending.Len())

// Getting the pending segment should remove it from the list.
it := m.NextPending()
require.NotNil(t, it)
require.Equal(t, 9, m.available.Len())
require.Equal(t, 0, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// The segment should contain 1KB of data.
require.Equal(t, int64(1024), it.Writer.InputSize())

// Putting it back should add it to the available list.
m.Put(it)
require.Equal(t, 10, m.available.Len())
require.Equal(t, 1, m.available.Len())
require.Equal(t, 0, m.pending.Len())

// The segment should be reset.
Expand Down Expand Up @@ -265,8 +348,8 @@ wal_segments_pending 0
require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...))

// Appending 1KB of data.
lbs := labels.Labels{{Name: "foo", Value: "bar"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("b", 1024)}}
lbs := labels.Labels{{Name: "a", Value: "b"}}
entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}}
_, err = m.Append(AppendRequest{
TenantID: "1",
Labels: lbs,
Expand Down

0 comments on commit 5ab5b65

Please sign in to comment.