Skip to content

Commit

Permalink
[chore] fix nits in persistent queue implementation (#7203)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Feb 16, 2023
1 parent 69dd391 commit fbbb99e
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 68 deletions.
18 changes: 6 additions & 12 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,10 @@ var (

// persistentQueue holds the queue backed by file storage
type persistentQueue struct {
logger *zap.Logger
stopWG sync.WaitGroup
stopOnce sync.Once
stopChan chan struct{}
numWorkers int
storage *persistentContiguousStorage
stopWG sync.WaitGroup
stopOnce sync.Once
stopChan chan struct{}
storage *persistentContiguousStorage
}

// buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done
Expand All @@ -51,21 +49,17 @@ func buildPersistentStorageName(name string, signal component.DataType) string {
// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
return &persistentQueue{
logger: logger,
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler),
}
}

// StartConsumers starts the given number of consumers which will be consuming items
func (pq *persistentQueue) StartConsumers(num int, callback func(item Request)) {
pq.numWorkers = num

for i := 0; i < pq.numWorkers; i++ {
func (pq *persistentQueue) StartConsumers(numWorkers int, callback func(item Request)) {
for i := 0; i < numWorkers; i++ {
pq.stopWG.Add(1)
go func() {
defer pq.stopWG.Done()

for {
select {
case req := <-pq.storage.get():
Expand Down
36 changes: 17 additions & 19 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -50,36 +51,36 @@ func TestPersistentQueue_Capacity(t *testing.T) {
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })

wq := createTestQueue(ext, 5)
require.Equal(t, 0, wq.Size())
assert.Equal(t, 0, wq.Size())

traces := newTraces(1, 10)
req := newFakeTracesRequest(traces)

for i := 0; i < 10; i++ {
result := wq.Produce(req)
if i < 6 {
require.True(t, result)
assert.True(t, result)
} else {
require.False(t, result)
assert.False(t, result)
}

// Let's make sure the loop picks the first element into the channel,
// so the capacity could be used in full
if i == 0 {
require.Eventually(t, func() bool {
assert.Eventually(t, func() bool {
return wq.Size() == 0
}, 5*time.Second, 10*time.Millisecond)
}
}
require.Equal(t, 5, wq.Size())
assert.Equal(t, 5, wq.Size())
}
}

func TestPersistentQueue_Close(t *testing.T) {
path := t.TempDir()

ext := createStorageExtension(path)
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })
t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) })

wq := createTestQueue(ext, 1001)
traces := newTraces(1, 10)
Expand All @@ -91,23 +92,21 @@ func TestPersistentQueue_Close(t *testing.T) {
wq.Produce(req)
}
// This will close the queue very quickly, consumers might not be able to consume anything and should finish gracefully
require.Eventually(t, func() bool {
assert.NotPanics(t, func() {
wq.Stop()
return true
}, 5*time.Second, 10*time.Millisecond)
})
// The additional stop should not panic
require.Eventually(t, func() bool {
assert.NotPanics(t, func() {
wq.Stop()
return true
}, 5*time.Second, 10*time.Millisecond)
})
}

// Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage
func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
path := t.TempDir()

ext := createStorageExtension(path)
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })
t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) })

wq := createTestQueue(ext, 1001)
traces := newTraces(1, 10)
Expand All @@ -130,11 +129,10 @@ func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
for i := 0; i < 1000; i++ {
wq.Produce(req)
}
require.Eventually(t, func() bool {
assert.NotPanics(t, func() {
wq.Stop()
return true
}, 5*time.Second, 10*time.Millisecond)
require.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
})
assert.True(t, stopStorageTime.After(lastRequestProcessedTime), "storage stop time should be after last request processed time")
stopStorage = fnBefore
}

Expand Down Expand Up @@ -176,7 +174,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
tq := createTestQueue(ext, 5000)

defer tq.Stop()
t.Cleanup(func() { require.NoError(t, ext.Shutdown(context.Background())) })
t.Cleanup(func() { assert.NoError(t, ext.Shutdown(context.Background())) })

numMessagesConsumed := atomic.NewInt32(0)
tq.StartConsumers(c.numConsumers, func(item Request) {
Expand All @@ -189,7 +187,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
tq.Produce(req)
}

require.Eventually(t, func() bool {
assert.Eventually(t, func() bool {
return c.numMessagesProduced == int(numMessagesConsumed.Load())
}, 5*time.Second, 10*time.Millisecond)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -35,7 +36,6 @@ func TestPersistentStorageBatch_Operations(t *testing.T) {
setItemIndex("index", itemIndexValue).
setItemIndexArray("arr", itemIndexArrayValue).
execute(context.Background())

require.NoError(t, err)

batch, err := newBatch(ps).
Expand All @@ -45,11 +45,11 @@ func TestPersistentStorageBatch_Operations(t *testing.T) {

retrievedItemIndexValue, err := batch.getItemIndexResult("index")
require.NoError(t, err)
require.Equal(t, itemIndexValue, retrievedItemIndexValue)
assert.Equal(t, itemIndexValue, retrievedItemIndexValue)

retrievedItemIndexArrayValue, err := batch.getItemIndexArrayResult("arr")
require.NoError(t, err)
require.Equal(t, itemIndexArrayValue, retrievedItemIndexArrayValue)
assert.Equal(t, itemIndexArrayValue, retrievedItemIndexArrayValue)

_, err = newBatch(ps).delete("index", "arr").execute(context.Background())
require.NoError(t, err)
Expand All @@ -60,9 +60,9 @@ func TestPersistentStorageBatch_Operations(t *testing.T) {
require.NoError(t, err)

_, err = batch.getItemIndexResult("index")
require.Error(t, err, errValueNotSet)
assert.Error(t, err, errValueNotSet)

retrievedItemIndexArrayValue, err = batch.getItemIndexArrayResult("arr")
require.NoError(t, err)
require.Nil(t, retrievedItemIndexArrayValue)
assert.Nil(t, retrievedItemIndexArrayValue)
}
45 changes: 13 additions & 32 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

Expand Down Expand Up @@ -226,12 +227,12 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0})

// Now, this will take item 0 and pull item 1 into the unbuffered channel
readReq := getItemFromChannel(t, ps)
require.Equal(t, req.td, readReq.(*fakeTracesRequest).td)
readReq := <-ps.get()
assert.Equal(t, req.td, readReq.(*fakeTracesRequest).td)
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1})

// This takes item 1 from channel and pulls another one (item 2) into the unbuffered channel
secondReadReq := getItemFromChannel(t, ps)
secondReadReq := <-ps.get()
requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1, 2})

// Lets mark item 1 as finished, it will remove it from the currently dispatched items list
Expand All @@ -242,21 +243,21 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
// The queue should be essentially {3,4,0,2} out of which item "3" should be pulled right away into
// the unbuffered channel. Check how many items are there, which, after the current one is fetched should go to 3.
newPs := createTestPersistentStorage(client)
require.Eventually(t, func() bool {
assert.Eventually(t, func() bool {
return newPs.size() == 3
}, 5*time.Second, 10*time.Millisecond)

requireCurrentlyDispatchedItemsEqual(t, newPs, []itemIndex{3})

// We should be able to pull all remaining items now
for i := 0; i < 4; i++ {
req := getItemFromChannel(t, newPs)
req := <-newPs.get()
req.OnProcessingFinished()
}

// The queue should be now empty
requireCurrentlyDispatchedItemsEqual(t, newPs, nil)
require.Eventually(t, func() bool {
assert.Eventually(t, func() bool {
return newPs.size() == 0
}, 5*time.Second, 10*time.Millisecond)

Expand Down Expand Up @@ -302,10 +303,10 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)

// Lets read both of the elements we put
readReq := getItemFromChannel(t, ps)
readReq := <-ps.get()
require.Equal(t, req.td, readReq.(*fakeTracesRequest).td)

readReq = getItemFromChannel(t, ps)
readReq = <-ps.get()
require.Equal(t, req.td, readReq.(*fakeTracesRequest).td)
require.Equal(t, uint64(0), ps.size())

Expand Down Expand Up @@ -431,15 +432,6 @@ func TestPersistentStorage_StopShouldCloseClient(t *testing.T) {
require.Equal(t, uint64(1), castedClient.getCloseCount())
}

func getItemFromChannel(t *testing.T, pcs *persistentContiguousStorage) Request {
var readReq Request
require.Eventually(t, func() bool {
readReq = <-pcs.get()
return true
}, 5*time.Second, 10*time.Millisecond)
return readReq
}

func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguousStorage, compare []itemIndex) {
require.Eventually(t, func() bool {
pcs.mu.Lock()
Expand All @@ -448,30 +440,19 @@ func requireCurrentlyDispatchedItemsEqual(t *testing.T, pcs *persistentContiguou
}, 5*time.Second, 10*time.Millisecond)
}

type mockStorageExtension struct{}

func (m mockStorageExtension) Start(_ context.Context, _ component.Host) error {
return nil
}

func (m mockStorageExtension) Shutdown(_ context.Context) error {
return nil
type mockStorageExtension struct {
component.StartFunc
component.ShutdownFunc
}

func (m mockStorageExtension) GetClient(ctx context.Context, kind component.Kind, id component.ID, s string) (storage.Client, error) {
return newMockStorageClient(), nil
return &mockStorageClient{st: map[string][]byte{}}, nil
}

func newMockStorageExtension() storage.Extension {
return &mockStorageExtension{}
}

func newMockStorageClient() storage.Client {
return &mockStorageClient{
st: map[string][]byte{},
}
}

type mockStorageClient struct {
st map[string][]byte
mux sync.Mutex
Expand Down

0 comments on commit fbbb99e

Please sign in to comment.