From c0deae5d7933b751ae55e876ffe675e63266087a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miko=C5=82aj=20=C5=9Awi=C4=85tek?= Date: Mon, 27 Nov 2023 01:35:44 +0100 Subject: [PATCH] [exporterhelper] Fix invalid write index updates in the persistent queue (#8963) **Description:** Fixing a bug where the in-memory value of the persistent queue's write index would be updated even if writing to the storage failed. This normally wouldn't have any negative effect other than inflating the queue size temporarily, as the read loop would simply skip over the nonexistent record. However, in the case where the storage doesn't have any available space, the in-memory and in-storage write index could become significantly different, at which point a collector restart would leave the queue in an inconsistent state. Worth noting that the same issue affects reading from the queue, but in that case the writes are very small, and in practice the storage will almost always have enough space to carry them out. **Link to tracking Issue:** #8115 **Testing:** The `TestPersistentQueue_StorageFull` test actually only passed by accident. Writing would leave one additional item in the put channel, then the first read would fail (as there is not enough space to do the read index and dispatched items writes), but subsequent reads would succeed, so the bugs would cancel out. I modified this test to check for the number of items in the queue after inserting them, and also to expect one fewer item to be returned. --- .../fix_persistentstorage_index-updates.yaml | 25 +++++++++++++++++++ .../internal/persistent_queue.go | 17 +++++++++---- .../internal/persistent_queue_test.go | 6 +++++ 3 files changed, 43 insertions(+), 5 deletions(-) create mode 100755 .chloggen/fix_persistentstorage_index-updates.yaml diff --git a/.chloggen/fix_persistentstorage_index-updates.yaml b/.chloggen/fix_persistentstorage_index-updates.yaml new file mode 100755 index 00000000000..9b10afbaafe --- /dev/null +++ b/.chloggen/fix_persistentstorage_index-updates.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix invalid write index updates in the persistent queue + +# One or more tracking issues or pull requests related to the change +issues: [8115] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] \ No newline at end of file diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index 97d5a100727..17bc2fe642e 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -215,20 +215,27 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { } itemKey := getItemKey(pq.writeIndex) - pq.writeIndex++ + newIndex := pq.writeIndex + 1 reqBuf, err := pq.marshaler(req) if err != nil { return err } - err = pq.client.Batch(ctx, - storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex)), - storage.SetOperation(itemKey, reqBuf)) + // Carry out a transaction where we both add the item and update the write index + ops := []storage.Operation{ + storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)), + storage.SetOperation(itemKey, reqBuf), + } + if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil { + return storageErr + } + + pq.writeIndex = newIndex // Inform the loop that there's some data to process pq.putChan <- struct{}{} - return err + return nil } // getNextItem pulls the next available item from the persistent storage along with a callback function that should be diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 2e1598d9579..5fb87e6bc10 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -626,6 +626,9 @@ func TestPersistentQueue_StorageFull(t *testing.T) { reqCount++ } + // Check that the size is correct + require.Equal(t, reqCount, ps.Size(), "Size must be equal to the number of items inserted") + // Manually set the storage to only have a small amount of free space left newMaxSize := client.GetSizeInBytes() + freeSpaceInBytes client.SetMaxSizeInBytes(newMaxSize) @@ -634,6 +637,9 @@ func TestPersistentQueue_StorageFull(t *testing.T) { require.Error(t, ps.Offer(context.Background(), req)) // Take out all the items + // Getting the first item fails, as we can't update the state in storage, so we just delete it without returning it + // Subsequent items succeed, as deleting the first item frees enough space for the state update + reqCount-- for i := reqCount; i > 0; i-- { require.True(t, ps.Consume(func(context.Context, ptrace.Traces) {})) }