Skip to content

Commit

Permalink
Handle corrupted persistent storage data gracefully (#4475)
Browse files Browse the repository at this point in the history
This fixes a bug with handling corrupted data by persistent buffer. In principle, when corrupted data is encountered, it should be handled gracefully. This fix makes sure this happens for several scenarios. Also, the change adds several test cases

The original bug was found by @pmalek-sumo and was expressed by following:

```
2021-11-24T11:53:33.753Z    info    internal/persistent_storage.go:309    Fetching items left for dispatch by consumers    {"kind": "exporter", "name": "otlphttp/containers", "queueName": "otlphttp/containers-logs", "numberOfItems": 2}
panic: interface conversion: interface is nil, not internal.PersistentRequest

goroutine 1 [running]:
go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*batchStruct).getRequestResult(0xc001653038, {0x4e937b8, 0x1})
    go.opentelemetry.io/[email protected]/exporter/exporterhelper/internal/persistent_storage_batch.go:115 +0x65
go.opentelemetry.io/collector/exporter/exporterhelper/internal.(*persistentContiguousStorage).retrieveNotDispatchedReqs(0xc001068140, {0x567dda0, 0xc000078030})
    go.opentelemetry.io/[email protected]/exporter/exporterhelper/internal/persistent_storage.go:341 +0xf6d
go.opentelemetry.io/collector/exporter/exporterhelper/internal.newPersistentContiguousStorage({0x567dda0, 0xc000078030}, {0xc000fde040, 0x19}, 0x1388, 0xc001006960, {0x56aedf8, 0xc00000ebf0}, 0xc00100e630)
    go.opentelemetry.io/[email protected]/exporter/exporterhelper/internal/persistent_storage.go:124 +0x1e5
go.opentelemetry.io/collector/exporter/exporterhelper/internal.NewPersistentQueue(...)
    go.opentelemetry.io/[email protected]/exporter/exporterhelper/internal/persistent_queue.go:44
go.opentelemetry.io/collector/exporter/exporterhelper.
```

**Link to tracking Issue:** N/A

**Testing:** Unit tests added

**Documentation:** N/A
  • Loading branch information
pmm-sumo authored Nov 29, 2021
1 parent 5bfd860 commit 96a882a
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

## 🧰 Bug fixes 🧰

- Fix handling of corrupted records by persistent buffer (experimental) (#4475)

## v0.40.0 Beta

## 🛑 Breaking changes 🛑
Expand Down
23 changes: 17 additions & 6 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []Persiste
if len(reqs) > 0 {
errCount := 0
for _, req := range reqs {
if pcs.put(req) != nil {
if req == nil || pcs.put(req) != nil {
errCount++
}
}
Expand Down Expand Up @@ -265,16 +265,20 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Persis
pcs.updateReadIndex(ctx)
pcs.itemDispatchingStart(ctx, index)

var req PersistentRequest
batch, err := newBatch(pcs).get(pcs.itemKey(index)).execute(ctx)
if err != nil {
return nil, false
if err == nil {
req, err = batch.getRequestResult(pcs.itemKey(index))
}

req, err := batch.getRequestResult(pcs.itemKey(index))
if err != nil || req == nil {
// We need to make sure that currently dispatched items list is cleaned
pcs.itemDispatchingFinish(ctx, index)

return nil, false
}

// If all went well so far, cleanup will be handled by callback
req.SetOnProcessingFinished(func() {
pcs.mu.Lock()
defer pcs.mu.Unlock()
Expand All @@ -287,7 +291,8 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Persis
}

// retrieveNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
// and moves the items back to the queue
// and moves the items back to the queue. The function returns an array which might contain nils
// if unmarshalling of the value at a given index was not possible.
func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []PersistentRequest {
var reqs []PersistentRequest
var dispatchedItems []itemIndex
Expand Down Expand Up @@ -339,11 +344,17 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co

for i, key := range keys {
req, err := retrieveBatch.getRequestResult(key)
// If error happened or item is nil, it will be efficiently ignored
if err != nil {
pcs.logger.Warn("Failed unmarshalling item",
zap.String(zapQueueNameKey, pcs.queueName), zap.String(zapKey, key), zap.Error(err))
} else {
reqs[i] = req
if req == nil {
pcs.logger.Debug("Item value could not be retrieved",
zap.String(zapQueueNameKey, pcs.queueName), zap.String(zapKey, key), zap.Error(err))
} else {
reqs[i] = req
}
}
}

Expand Down
8 changes: 7 additions & 1 deletion exporter/exporterhelper/internal/persistent_storage_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (bof *batchStruct) delete(keys ...string) *batchStruct {
}

// getResult returns the result of a Get operation for a given key using the provided unmarshal function.
// It should be called after execute
// It should be called after execute. It may return nil value
func (bof *batchStruct) getResult(key string, unmarshal func([]byte) (interface{}, error)) (interface{}, error) {
op := bof.getOperations[key]
if op == nil {
Expand All @@ -106,16 +106,21 @@ func (bof *batchStruct) getResult(key string, unmarshal func([]byte) (interface{
}

// getRequestResult returns the result of a Get operation as a request
// If the value cannot be retrieved, it returns an error
func (bof *batchStruct) getRequestResult(key string) (PersistentRequest, error) {
reqIf, err := bof.getResult(key, bof.bytesToRequest)
if err != nil {
return nil, err
}
if reqIf == nil {
return nil, errValueNotSet
}

return reqIf.(PersistentRequest), nil
}

// getItemIndexResult returns the result of a Get operation as an itemIndex
// If the value cannot be retrieved, it returns an error
func (bof *batchStruct) getItemIndexResult(key string) (itemIndex, error) {
itemIndexIf, err := bof.getResult(key, bytesToItemIndex)
if err != nil {
Expand All @@ -130,6 +135,7 @@ func (bof *batchStruct) getItemIndexResult(key string) (itemIndex, error) {
}

// getItemIndexArrayResult returns the result of a Get operation as a itemIndexArray
// It may return nil value
func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) {
itemIndexArrIf, err := bof.getResult(key, bytesToItemIndexArray)
if err != nil {
Expand Down
118 changes: 118 additions & 0 deletions exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,124 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler {
}
}

func TestPersistentStorage_CorruptedData(t *testing.T) {
path := createTemporaryDirectory()
defer os.RemoveAll(path)

traces := newTraces(5, 10)
req := newFakeTracesRequest(traces)

ext := createStorageExtension(path)

cases := []struct {
name string
corruptAllData bool
corruptSomeData bool
corruptCurrentlyDispatchedItemsKey bool
corruptReadIndex bool
corruptWriteIndex bool
desiredQueueSize uint64
desiredNumberOfDispatchedItems int
}{
{
name: "corrupted no items",
corruptAllData: false,
desiredQueueSize: 2,
desiredNumberOfDispatchedItems: 1,
},
{
name: "corrupted all items",
corruptAllData: true,
desiredQueueSize: 0,
desiredNumberOfDispatchedItems: 0,
},
{
name: "corrupted some items",
corruptSomeData: true,
desiredQueueSize: 1,
desiredNumberOfDispatchedItems: 1,
},
{
name: "corrupted dispatched items key",
corruptCurrentlyDispatchedItemsKey: true,
desiredQueueSize: 1,
desiredNumberOfDispatchedItems: 1,
},
{
name: "corrupted read index",
corruptReadIndex: true,
desiredQueueSize: 0,
desiredNumberOfDispatchedItems: 1,
},
{
name: "corrupted write index",
corruptWriteIndex: true,
desiredQueueSize: 0,
desiredNumberOfDispatchedItems: 1,
},
{
name: "corrupted everything",
corruptAllData: true,
corruptCurrentlyDispatchedItemsKey: true,
corruptReadIndex: true,
corruptWriteIndex: true,
desiredQueueSize: 0,
desiredNumberOfDispatchedItems: 0,
},
}

badBytes := []byte{0, 1, 2}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
client := createTestClient(ext)
ps := createTestPersistentStorage(client)

ctx := context.Background()

// Put some items, make sure they are loaded and shutdown the storage...
for i := 0; i < 3; i++ {
err := ps.put(req)
require.NoError(t, err)
}
require.Eventually(t, func() bool {
return ps.size() == 2
}, 500*time.Millisecond, 10*time.Millisecond)
ps.stop()

// ... so now we can corrupt data (in several ways)
if c.corruptAllData || c.corruptSomeData {
_ = client.Set(ctx, "0", badBytes)
}
if c.corruptAllData {
_ = client.Set(ctx, "1", badBytes)
_ = client.Set(ctx, "2", badBytes)
}

if c.corruptCurrentlyDispatchedItemsKey {
_ = client.Set(ctx, currentlyDispatchedItemsKey, badBytes)
}

if c.corruptReadIndex {
_ = client.Set(ctx, readIndexKey, badBytes)
}

if c.corruptWriteIndex {
_ = client.Set(ctx, writeIndexKey, badBytes)
}

// Reload
newPs := createTestPersistentStorage(client)

require.Eventually(t, func() bool {
newPs.mu.Lock()
defer newPs.mu.Unlock()
return newPs.size() == c.desiredQueueSize && len(newPs.currentlyDispatchedItems) == c.desiredNumberOfDispatchedItems
}, 500*time.Millisecond, 10*time.Millisecond)
})
}
}

func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) {
path := createTemporaryDirectory()
defer os.RemoveAll(path)
Expand Down

0 comments on commit 96a882a

Please sign in to comment.