diff --git a/pkg/storage/wal/manager_test.go b/pkg/storage/wal/manager_test.go index bd120e2fe8cc2..acf89aa81cd99 100644 --- a/pkg/storage/wal/manager_test.go +++ b/pkg/storage/wal/manager_test.go @@ -16,20 +16,15 @@ import ( func TestManager_Append(t *testing.T) { m, err := NewManager(Config{ + MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB }, NewMetrics(nil)) require.NoError(t, err) // Append some data. - lbs := labels.Labels{{ - Name: "foo", - Value: "bar", - }} - entries := []*logproto.Entry{{ - Timestamp: time.Now(), - Line: strings.Repeat("a", 1024), - }} + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} res, err := m.Append(AppendRequest{ TenantID: "1", Labels: lbs, @@ -46,28 +41,30 @@ func TestManager_Append(t *testing.T) { default: } - // Flush the data and broadcast that the flush is successful. - it := m.NextPending() - require.NotNil(t, it) - it.Result.SetDone(nil) + // Pretend to flush the data. + res.SetDone(nil) - // Should be able to read from the Done() as it is closed. + // Should be able to read from Done() as it is closed. select { case <-res.Done(): default: t.Fatal("expected closed Done()") } require.NoError(t, res.Err()) +} - // Return the segment to be written to again. - m.Put(it) +func TestManager_AppendFailed(t *testing.T) { + m, err := NewManager(Config{ + MaxAge: 30 * time.Second, + MaxSegments: 1, + MaxSegmentSize: 1024, // 1KB + }, NewMetrics(nil)) + require.NoError(t, err) - // Append some more data. - entries = []*logproto.Entry{{ - Timestamp: time.Now(), - Line: strings.Repeat("b", 1024), - }} - res, err = m.Append(AppendRequest{ + // Append some data. + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} + res, err := m.Append(AppendRequest{ TenantID: "1", Labels: lbs, LabelsStr: lbs.String(), @@ -76,12 +73,11 @@ func TestManager_Append(t *testing.T) { require.NoError(t, err) require.NotNil(t, res) - // Flush the data, but this time broadcast an error that the flush failed. - it = m.NextPending() - require.NotNil(t, it) - it.Result.SetDone(errors.New("failed to flush")) + // Pretend that the flush failed. + res.SetDone(errors.New("failed to flush")) - // Should be able to read from the Done() as it is closed. + // Should be able to read from the Done() as it is closed and assert + // that the error is the expected error. select { case <-res.Done(): default: @@ -90,25 +86,103 @@ func TestManager_Append(t *testing.T) { require.EqualError(t, res.Err(), "failed to flush") } -// This test asserts that Append operations return ErrFull if all segments -// are full and waiting to be flushed. -func TestManager_Append_ErrFull(t *testing.T) { +func TestManager_AppendMaxAge(t *testing.T) { m, err := NewManager(Config{ + MaxAge: 100 * time.Millisecond, + MaxSegments: 1, + MaxSegmentSize: 8 * 1024 * 1024, // 8MB + }, NewMetrics(nil)) + require.NoError(t, err) + + // Append 1B of data. + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} + res, err := m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // The segment that was just appended to has neither reached the maximum + // age nor maximum size to be flushed. + require.Equal(t, 1, m.available.Len()) + require.Equal(t, 0, m.pending.Len()) + + // Wait 100ms and append some more data. + time.Sleep(100 * time.Millisecond) + entries = []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} + res, err = m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // The segment has reached the maximum age and should have been moved to + // pending list to be flushed. + require.Equal(t, 0, m.available.Len()) + require.Equal(t, 1, m.pending.Len()) +} + +func TestManager_AppendMaxSize(t *testing.T) { + m, err := NewManager(Config{ + MaxAge: 30 * time.Second, + MaxSegments: 1, + MaxSegmentSize: 1024, // 1KB + }, NewMetrics(nil)) + require.NoError(t, err) + + // Append 512B of data. + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} + res, err := m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // The segment that was just appended to has neither reached the maximum + // age nor maximum size to be flushed. + require.Equal(t, 1, m.available.Len()) + require.Equal(t, 0, m.pending.Len()) + + // Append another 512B of data. + entries = []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 512)}} + res, err = m.Append(AppendRequest{ + TenantID: "1", + Labels: lbs, + LabelsStr: lbs.String(), + Entries: entries, + }) + require.NoError(t, err) + require.NotNil(t, res) + + // The segment has reached the maximum size and should have been moved to + // pending list to be flushed. + require.Equal(t, 0, m.available.Len()) + require.Equal(t, 1, m.pending.Len()) +} + +func TestManager_AppendWALFull(t *testing.T) { + m, err := NewManager(Config{ + MaxAge: 30 * time.Second, MaxSegments: 10, MaxSegmentSize: 1024, // 1KB }, NewMetrics(nil)) require.NoError(t, err) - // Should be able to write to all 10 segments of 1KB each. - lbs := labels.Labels{{ - Name: "foo", - Value: "bar", - }} + // Should be able to write 100KB of data, 10KB per segment. + lbs := labels.Labels{{Name: "a", Value: "b"}} for i := 0; i < 10; i++ { - entries := []*logproto.Entry{{ - Timestamp: time.Now(), - Line: strings.Repeat("a", 1024), - }} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} res, err := m.Append(AppendRequest{ TenantID: "1", Labels: lbs, @@ -119,12 +193,9 @@ func TestManager_Append_ErrFull(t *testing.T) { require.NotNil(t, res) } - // Append more data should fail as all segments are full and waiting to be - // flushed. - entries := []*logproto.Entry{{ - Timestamp: time.Now(), - Line: strings.Repeat("b", 1024), - }} + // However, appending more data should fail as all segments are full and + // waiting to be flushed. + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} res, err := m.Append(AppendRequest{ TenantID: "1", Labels: lbs, @@ -137,26 +208,20 @@ func TestManager_Append_ErrFull(t *testing.T) { func TestManager_NextPending(t *testing.T) { m, err := NewManager(Config{ - MaxAge: DefaultMaxAge, + MaxAge: 30 * time.Second, MaxSegments: 1, MaxSegmentSize: 1024, // 1KB }, NewMetrics(nil)) require.NoError(t, err) - // There should be no items as no data has been written. + // There should be no segments waiting to be flushed as no data has been + // written. it := m.NextPending() require.Nil(t, it) - // Append 512B of data. There should still be no items to as the segment is - // not full (1KB). - lbs := labels.Labels{{ - Name: "foo", - Value: "bar", - }} - entries := []*logproto.Entry{{ - Timestamp: time.Now(), - Line: strings.Repeat("b", 512), - }} + // Append 1KB of data. + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} _, err = m.Append(AppendRequest{ TenantID: "1", Labels: lbs, @@ -164,49 +229,67 @@ func TestManager_NextPending(t *testing.T) { Entries: entries, }) require.NoError(t, err) + + // There should be a segment waiting to be flushed. + it = m.NextPending() + require.NotNil(t, it) + + // There should be no more segments waiting to be flushed. it = m.NextPending() require.Nil(t, it) +} - // Write another 512B of data. There should be an item waiting to be flushed. - entries = []*logproto.Entry{{ - Timestamp: time.Now(), - Line: strings.Repeat("b", 512), - }} - _, err = m.Append(AppendRequest{ +func TestManager_NexPendingMaxAge(t *testing.T) { + m, err := NewManager(Config{ + MaxAge: 100 * time.Millisecond, + MaxSegments: 1, + MaxSegmentSize: 1024, // 1KB + }, NewMetrics(nil)) + require.NoError(t, err) + + // Append 1B of data. + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: "c"}} + res, err := m.Append(AppendRequest{ TenantID: "1", Labels: lbs, LabelsStr: lbs.String(), Entries: entries, }) require.NoError(t, err) - it = m.NextPending() - require.NotNil(t, it) + require.NotNil(t, res) - // Should not get the same item more than once. - it = m.NextPending() + // The segment that was just appended to has neither reached the maximum + // age nor maximum size to be flushed. + it := m.NextPending() require.Nil(t, it) + require.Equal(t, 1, m.available.Len()) + require.Equal(t, 0, m.pending.Len()) + + // Wait 100ms. The segment that was just appended to should have reached + // the maximum age. + time.Sleep(100 * time.Millisecond) + it = m.NextPending() + require.NotNil(t, it) + require.Equal(t, 0, m.available.Len()) + require.Equal(t, 0, m.pending.Len()) } func TestManager_Put(t *testing.T) { m, err := NewManager(Config{ - MaxSegments: 10, + MaxAge: 30 * time.Second, + MaxSegments: 1, MaxSegmentSize: 1024, // 1KB }, NewMetrics(nil)) require.NoError(t, err) - // There should be 10 available segments, and 0 pending. - require.Equal(t, 10, m.available.Len()) + // There should be 10 available and 0 pending segments. + require.Equal(t, 1, m.available.Len()) require.Equal(t, 0, m.pending.Len()) // Append 1KB of data. - lbs := labels.Labels{{ - Name: "foo", - Value: "bar", - }} - entries := []*logproto.Entry{{ - Timestamp: time.Now(), - Line: strings.Repeat("b", 1024), - }} + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} _, err = m.Append(AppendRequest{ TenantID: "1", Labels: lbs, @@ -215,15 +298,15 @@ func TestManager_Put(t *testing.T) { }) require.NoError(t, err) - // 1 segment is full, so there should now be 9 available segments, - // and 1 pending segment. - require.Equal(t, 9, m.available.Len()) + // The segment is full, so there should now be 0 available segments and 1 + // pending segment. + require.Equal(t, 0, m.available.Len()) require.Equal(t, 1, m.pending.Len()) // Getting the pending segment should remove it from the list. it := m.NextPending() require.NotNil(t, it) - require.Equal(t, 9, m.available.Len()) + require.Equal(t, 0, m.available.Len()) require.Equal(t, 0, m.pending.Len()) // The segment should contain 1KB of data. @@ -231,7 +314,7 @@ func TestManager_Put(t *testing.T) { // Putting it back should add it to the available list. m.Put(it) - require.Equal(t, 10, m.available.Len()) + require.Equal(t, 1, m.available.Len()) require.Equal(t, 0, m.pending.Len()) // The segment should be reset. @@ -265,8 +348,8 @@ wal_segments_pending 0 require.NoError(t, testutil.CollectAndCompare(r, strings.NewReader(expected), metricNames...)) // Appending 1KB of data. - lbs := labels.Labels{{Name: "foo", Value: "bar"}} - entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("b", 1024)}} + lbs := labels.Labels{{Name: "a", Value: "b"}} + entries := []*logproto.Entry{{Timestamp: time.Now(), Line: strings.Repeat("c", 1024)}} _, err = m.Append(AppendRequest{ TenantID: "1", Labels: lbs,