Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] fix nits in persistent queue implementation #7203

Merged
merged 1 commit into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 6 additions & 12 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@ var (

// persistentQueue holds the queue backed by file storage
type persistentQueue struct {
logger *zap.Logger
stopWG sync.WaitGroup
stopOnce sync.Once
stopChan chan struct{}
numWorkers int
storage *persistentContiguousStorage
stopWG sync.WaitGroup
stopOnce sync.Once
stopChan chan struct{}
storage *persistentContiguousStorage
}

// buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done
Expand All @@ -51,21 +49,17 @@ func buildPersistentStorageName(name string, signal component.DataType) string {
// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
return &persistentQueue{
logger: logger,
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler),
}
}

// StartConsumers starts the given number of consumers which will be consuming items
func (pq *persistentQueue) StartConsumers(num int, callback func(item Request)) {
pq.numWorkers = num

for i := 0; i < pq.numWorkers; i++ {
func (pq *persistentQueue) StartConsumers(numWorkers int, callback func(item Request)) {
for i := 0; i < numWorkers; i++ {
pq.stopWG.Add(1)
go func() {
defer pq.stopWG.Done()

for {
select {
case req := <-pq.storage.get():
Expand Down
36 changes: 17 additions & 19 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -50,36 +51,36 @@ func TestPersistentQueue_Capacity(t *testing.T) {
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

wq := createTestQueue(ext, 5)
require.Equal(t, 0, wq.Size())
assert.Equal(t, 0, wq.Size())

traces := newTraces(1, 10)
req := newFakeTracesRequest(traces)

for i := 0; i < 10; i++ {
result := wq.Produce(req)
if i < 6 {
require.True(t, result)
assert.True(t, result)
} else {
require.False(t, result)
assert.False(t, result)
}

// Let's make sure the loop picks the first element into the channel,
// so the capacity could be used in full
if i == 0 {
require.Eventually(t, func() bool {
assert.Eventually(t, func() bool {
return wq.Size() == 0
}, 5*time.Second, 10*time.Millisecond)
}
}
require.Equal(t, 5, wq.Size())
assert.Equal(t, 5, wq.Size())
}
}

func TestPersistentQueue_Close(t *testing.T) {
path := t.TempDir()

ext := createStorageExtension(path)
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })
t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) })

wq := createTestQueue(ext, 1001)
traces := newTraces(1, 10)
Expand All @@ -91,23 +92,21 @@ func TestPersistentQueue_Close(t *testing.T) {
wq.Produce(req)
}
// This will close the queue very quickly, consumers might not be able to consume anything and should finish gracefully
require.Eventually(t, func() bool {
assert.NotPanics(t, func() {
wq.Stop()
return true
}, 5*time.Second, 10*time.Millisecond)
})
// The additional stop should not panic
require.Eventually(t, func() bool {
assert.NotPanics(t, func() {
wq.Stop()
return true
}, 5*time.Second, 10*time.Millisecond)
})
}

// Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage
func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
path := t.TempDir()

ext := createStorageExtension(path)
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })
t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) })

wq := createTestQueue(ext, 1001)
traces := newTraces(1, 10)
Expand All @@ -130,11 +129,10 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
for i := 0; i < 1000; i++ {
wq.Produce(req)
}
require.Eventually(t, func() bool {
assert.NotPanics(t, func() {
wq.Stop()
return true
}, 5*time.Second, 10*time.Millisecond)
require.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
})
assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
stopStorage = fnBefore
}

Expand Down Expand Up @@ -176,7 +174,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
tq := createTestQueue(ext, 5000)

defer tq.Stop()
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })
t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) })

numMessagesConsumed := atomic.NewInt32(0)
tq.StartConsumers(c.numConsumers, func(item Request) {
Expand All @@ -189,7 +187,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
tq.Produce(req)
}

require.Eventually(t, func() bool {
assert.Eventually(t, func() bool {
return c.numMessagesProduced == int(numMessagesConsumed.Load())
}, 5*time.Second, 10*time.Millisecond)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -35,7 +36,6 @@ func TestPersistentStorageBatch_Operations(t *testing.T) {
setItemIndex("index", itemIndexValue).
setItemIndexArray("arr", itemIndexArrayValue).
execute(context.Background())

require.NoError(t, err)

batch, err := newBatch(ps).
Expand All @@ -45,11 +45,11 @@ func TestPersistentStorageBatch_Operations(t *testing.T) {

retrievedItemIndexValue, err := batch.getItemIndexResult("index")
require.NoError(t, err)
require.Equal(t, itemIndexValue, retrievedItemIndexValue)
assert.Equal(t, itemIndexValue, retrievedItemIndexValue)

retrievedItemIndexArrayValue, err := batch.getItemIndexArrayResult("arr")
require.NoError(t, err)
require.Equal(t, itemIndexArrayValue, retrievedItemIndexArrayValue)
assert.Equal(t, itemIndexArrayValue, retrievedItemIndexArrayValue)

_, err = newBatch(ps).delete("index", "arr").execute(context.Background())
require.NoError(t, err)
Expand All @@ -60,9 +60,9 @@ func TestPersistentStorageBatch_Operations(t *testing.T) {
require.NoError(t, err)

_, err = batch.getItemIndexResult("index")
require.Error(t, err, errValueNotSet)
assert.Error(t, err, errValueNotSet)

retrievedItemIndexArrayValue, err = batch.getItemIndexArrayResult("arr")
require.NoError(t, err)
require.Nil(t, retrievedItemIndexArrayValue)
assert.Nil(t, retrievedItemIndexArrayValue)
}
45 changes: 13 additions & 32 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

Expand Down Expand Up @@ -226,12 +227,12 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0})

// Now, this will take item 0 and pull item 1 into the unbuffered channel
readReq := getItemFromChannel(t, ps)
require.Equal(t, req.td, readReq.(*fakeTracesRequest).td)
readReq := <-ps.get()
assert.Equal(t, req.td, readReq.(*fakeTracesRequest).td)
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1})

// This takes item 1 from channel and pulls another one (item 2) into the unbuffered channel
secondReadReq := getItemFromChannel(t, ps)
secondReadReq := <-ps.get()
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1, 2})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list
Expand All @@ -242,21 +243,21 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
// The queue should be essentially {3,4,0,2} out of which item "3" should be pulled right away into
// the unbuffered channel. Check how many items are there, which, after the current one is fetched should go to 3.
newPs := createTestPersistentStorage(client)
require.Eventually(t, func() bool {
assert.Eventually(t, func() bool {
return newPs.size() == 3
}, 5*time.Second, 10*time.Millisecond)

requireCurrentlyDispatchedItemsEqual(t, newPs, []itemIndex{3})

// We should be able to pull all remaining items now
for i := 0; i < 4; i++ {
req := getItemFromChannel(t, newPs)
req := <-newPs.get()
req.OnProcessingFinished()
}

// The queue should be now empty
requireCurrentlyDispatchedItemsEqual(t, newPs, nil)
require.Eventually(t, func() bool {
assert.Eventually(t, func() bool {
return newPs.size() == 0
}, 5*time.Second, 10*time.Millisecond)

Expand Down Expand Up @@ -302,10 +303,10 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)

// Lets read both of the elements we put
readReq := getItemFromChannel(t, ps)
readReq := <-ps.get()
require.Equal(t, req.td, readReq.(*fakeTracesRequest).td)

readReq = getItemFromChannel(t, ps)
readReq = <-ps.get()
require.Equal(t, req.td, readReq.(*fakeTracesRequest).td)
require.Equal(t, uint64(0), ps.size())

Expand Down Expand Up @@ -431,15 +432,6 @@ func TestPersistentStorage_StopShouldCloseClient(t *testing.T) {
require.Equal(t, uint64(1), castedClient.getCloseCount())
}

func getItemFromChannel(t *testing.T, pcs *persistentContiguousStorage) Request {
var readReq Request
require.Eventually(t, func() bool {
readReq = <-pcs.get()
return true
}, 5*time.Second, 10*time.Millisecond)
return readReq
}

func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguousStorage, compare []itemIndex) {
require.Eventually(t, func() bool {
pcs.mu.Lock()
Expand All @@ -448,30 +440,19 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguou
}, 5*time.Second, 10*time.Millisecond)
}

type mockStorageExtension struct{}

func (m mockStorageExtension) Start(_ context.Context, _ component.Host) error {
return nil
}

func (m mockStorageExtension) Shutdown(_ context.Context) error {
return nil
type mockStorageExtension struct {
component.StartFunc
component.ShutdownFunc
}

func (m mockStorageExtension) GetClient(ctx context.Context, kind component.Kind, id component.ID, s string) (storage.Client, error) {
return newMockStorageClient(), nil
return &mockStorageClient{st: map[string][]byte{}}, nil
}

func newMockStorageExtension() storage.Extension {
return &mockStorageExtension{}
}

func newMockStorageClient() storage.Client {
return &mockStorageClient{
st: map[string][]byte{},
}
}

type mockStorageClient struct {
st map[string][]byte
mux sync.Mutex
Expand Down