diff --git a/CHANGELOG.md b/CHANGELOG.md index 34309a1f4d2..30bdee953b5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +## 🧰 Bug fixes 🧰 + +- Fix handling of corrupted records by persistent buffer (experimental) (#4475) + ## v0.40.0 Beta ## 🛑 Breaking changes 🛑 diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 2ffe743b10c..28d1f581325 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -173,7 +173,7 @@ func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []Persiste if len(reqs) > 0 { errCount := 0 for _, req := range reqs { - if pcs.put(req) != nil { + if req == nil || pcs.put(req) != nil { errCount++ } } @@ -265,16 +265,20 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Persis pcs.updateReadIndex(ctx) pcs.itemDispatchingStart(ctx, index) + var req PersistentRequest batch, err := newBatch(pcs).get(pcs.itemKey(index)).execute(ctx) - if err != nil { - return nil, false + if err == nil { + req, err = batch.getRequestResult(pcs.itemKey(index)) } - req, err := batch.getRequestResult(pcs.itemKey(index)) if err != nil || req == nil { + // We need to make sure that currently dispatched items list is cleaned + pcs.itemDispatchingFinish(ctx, index) + return nil, false } + // If all went well so far, cleanup will be handled by callback req.SetOnProcessingFinished(func() { pcs.mu.Lock() defer pcs.mu.Unlock() @@ -287,7 +291,8 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Persis } // retrieveNotDispatchedReqs gets the items for which sending was not finished, cleans the storage -// and moves the items back to the queue +// and moves the items back to the queue. The function returns an array which might contain nils +// if unmarshalling of the value at a given index was not possible. func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []PersistentRequest { var reqs []PersistentRequest var dispatchedItems []itemIndex @@ -339,11 +344,17 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co for i, key := range keys { req, err := retrieveBatch.getRequestResult(key) + // If error happened or item is nil, it will be efficiently ignored if err != nil { pcs.logger.Warn("Failed unmarshalling item", zap.String(zapQueueNameKey, pcs.queueName), zap.String(zapKey, key), zap.Error(err)) } else { - reqs[i] = req + if req == nil { + pcs.logger.Debug("Item value could not be retrieved", + zap.String(zapQueueNameKey, pcs.queueName), zap.String(zapKey, key), zap.Error(err)) + } else { + reqs[i] = req + } } } diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index a5803db212a..3dfcb5f2f16 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -91,7 +91,7 @@ func (bof *batchStruct) delete(keys ...string) *batchStruct { } // getResult returns the result of a Get operation for a given key using the provided unmarshal function. -// It should be called after execute +// It should be called after execute. It may return nil value func (bof *batchStruct) getResult(key string, unmarshal func([]byte) (interface{}, error)) (interface{}, error) { op := bof.getOperations[key] if op == nil { @@ -106,16 +106,21 @@ func (bof *batchStruct) getResult(key string, unmarshal func([]byte) (interface{ } // getRequestResult returns the result of a Get operation as a request +// If the value cannot be retrieved, it returns an error func (bof *batchStruct) getRequestResult(key string) (PersistentRequest, error) { reqIf, err := bof.getResult(key, bof.bytesToRequest) if err != nil { return nil, err } + if reqIf == nil { + return nil, errValueNotSet + } return reqIf.(PersistentRequest), nil } // getItemIndexResult returns the result of a Get operation as an itemIndex +// If the value cannot be retrieved, it returns an error func (bof *batchStruct) getItemIndexResult(key string) (itemIndex, error) { itemIndexIf, err := bof.getResult(key, bytesToItemIndex) if err != nil { @@ -130,6 +135,7 @@ func (bof *batchStruct) getItemIndexResult(key string) (itemIndex, error) { } // getItemIndexArrayResult returns the result of a Get operation as a itemIndexArray +// It may return nil value func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) { itemIndexArrIf, err := bof.getResult(key, bytesToItemIndexArray) if err != nil { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index 06258fb1d09..0ccc22590d8 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -104,6 +104,124 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } } +func TestPersistentStorage_CorruptedData(t *testing.T) { + path := createTemporaryDirectory() + defer os.RemoveAll(path) + + traces := newTraces(5, 10) + req := newFakeTracesRequest(traces) + + ext := createStorageExtension(path) + + cases := []struct { + name string + corruptAllData bool + corruptSomeData bool + corruptCurrentlyDispatchedItemsKey bool + corruptReadIndex bool + corruptWriteIndex bool + desiredQueueSize uint64 + desiredNumberOfDispatchedItems int + }{ + { + name: "corrupted no items", + corruptAllData: false, + desiredQueueSize: 2, + desiredNumberOfDispatchedItems: 1, + }, + { + name: "corrupted all items", + corruptAllData: true, + desiredQueueSize: 0, + desiredNumberOfDispatchedItems: 0, + }, + { + name: "corrupted some items", + corruptSomeData: true, + desiredQueueSize: 1, + desiredNumberOfDispatchedItems: 1, + }, + { + name: "corrupted dispatched items key", + corruptCurrentlyDispatchedItemsKey: true, + desiredQueueSize: 1, + desiredNumberOfDispatchedItems: 1, + }, + { + name: "corrupted read index", + corruptReadIndex: true, + desiredQueueSize: 0, + desiredNumberOfDispatchedItems: 1, + }, + { + name: "corrupted write index", + corruptWriteIndex: true, + desiredQueueSize: 0, + desiredNumberOfDispatchedItems: 1, + }, + { + name: "corrupted everything", + corruptAllData: true, + corruptCurrentlyDispatchedItemsKey: true, + corruptReadIndex: true, + corruptWriteIndex: true, + desiredQueueSize: 0, + desiredNumberOfDispatchedItems: 0, + }, + } + + badBytes := []byte{0, 1, 2} + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + client := createTestClient(ext) + ps := createTestPersistentStorage(client) + + ctx := context.Background() + + // Put some items, make sure they are loaded and shutdown the storage... + for i := 0; i < 3; i++ { + err := ps.put(req) + require.NoError(t, err) + } + require.Eventually(t, func() bool { + return ps.size() == 2 + }, 500*time.Millisecond, 10*time.Millisecond) + ps.stop() + + // ... so now we can corrupt data (in several ways) + if c.corruptAllData || c.corruptSomeData { + _ = client.Set(ctx, "0", badBytes) + } + if c.corruptAllData { + _ = client.Set(ctx, "1", badBytes) + _ = client.Set(ctx, "2", badBytes) + } + + if c.corruptCurrentlyDispatchedItemsKey { + _ = client.Set(ctx, currentlyDispatchedItemsKey, badBytes) + } + + if c.corruptReadIndex { + _ = client.Set(ctx, readIndexKey, badBytes) + } + + if c.corruptWriteIndex { + _ = client.Set(ctx, writeIndexKey, badBytes) + } + + // Reload + newPs := createTestPersistentStorage(client) + + require.Eventually(t, func() bool { + newPs.mu.Lock() + defer newPs.mu.Unlock() + return newPs.size() == c.desiredQueueSize && len(newPs.currentlyDispatchedItems) == c.desiredNumberOfDispatchedItems + }, 500*time.Millisecond, 10*time.Millisecond) + }) + } +} + func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { path := createTemporaryDirectory() defer os.RemoveAll(path)