From 8e0d75cfdf2be04fcd461a36dd4fcc3fec0106c4 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 15 Mar 2018 12:24:53 +0000 Subject: [PATCH] Fix data race in WAL This commit fixes a data race in the WAL, which can occur when writes and deletes are being executed concurrently. The WAL uses a buffer pool of `[]byte` when reading the WAL. WAL entries are unmarshaled into these buffers and passed along to the relevant methods handling the different types of entry (write, delete etc). In the case of deletes, the keys that need to be deleted were being stored for later processing, however these keys were part of the backing array of initial buffer from the pool. As such, those keys could be written to at a future time when handling other parts of the WAL. --- .hooks/pre-commit | 0 tsdb/engine/tsm1/wal.go | 15 ++++++++++-- tsdb/engine/tsm1/wal_test.go | 47 ++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 2 deletions(-) mode change 100755 => 100644 .hooks/pre-commit diff --git a/.hooks/pre-commit b/.hooks/pre-commit old mode 100755 new mode 100644 diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index c5ada3cb1d1..e0d09fc5d36 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -901,7 +901,14 @@ func (w *DeleteWALEntry) MarshalBinary() ([]byte, error) { // UnmarshalBinary deserializes the byte slice into w. func (w *DeleteWALEntry) UnmarshalBinary(b []byte) error { - w.Keys = bytes.Split(b, []byte("\n")) + if len(b) == 0 { + return nil + } + + // b originates from a pool. Copy what needs to be retained. + buf := make([]byte, len(b)) + copy(buf, b) + w.Keys = bytes.Split(buf, []byte("\n")) return nil } @@ -977,7 +984,11 @@ func (w *DeleteRangeWALEntry) UnmarshalBinary(b []byte) error { if i+sz > len(b) { return ErrWALCorrupt } - w.Keys = append(w.Keys, b[i:i+sz]) + + // b originates from a pool. Copy what needs to be retained. + buf := make([]byte, sz) + copy(buf, b[i:i+sz]) + w.Keys = append(w.Keys, buf) i += sz } return nil diff --git a/tsdb/engine/tsm1/wal_test.go b/tsdb/engine/tsm1/wal_test.go index 34c06c3a0cc..76e66ac921a 100644 --- a/tsdb/engine/tsm1/wal_test.go +++ b/tsdb/engine/tsm1/wal_test.go @@ -4,9 +4,11 @@ import ( "fmt" "io" "os" + "reflect" "testing" "github.com/golang/snappy" + "github.com/influxdata/influxdb/pkg/slices" "github.com/influxdata/influxdb/tsdb/engine/tsm1" ) @@ -685,6 +687,51 @@ func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) { } } +func TestDeleteWALEntry_UnmarshalBinary(t *testing.T) { + examples := []struct { + In []string + Out [][]byte + }{ + { + In: []string{""}, + Out: nil, + }, + { + In: []string{"foo"}, + Out: [][]byte{[]byte("foo")}, + }, + { + In: []string{"foo", "bar"}, + Out: [][]byte{[]byte("foo"), []byte("bar")}, + }, + { + In: []string{"foo", "bar", "z", "abc"}, + Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("abc")}, + }, + { + In: []string{"foo", "bar", "z", "a"}, + Out: [][]byte{[]byte("foo"), []byte("bar"), []byte("z"), []byte("a")}, + }, + } + + for i, example := range examples { + w := &tsm1.DeleteWALEntry{Keys: slices.StringsToBytes(example.In...)} + b, err := w.MarshalBinary() + if err != nil { + t.Fatalf("[example %d] unexpected error, got %v", i, err) + } + + out := &tsm1.DeleteWALEntry{} + if err := out.UnmarshalBinary(b); err != nil { + t.Fatalf("[example %d] %v", i, err) + } + + if !reflect.DeepEqual(example.Out, out.Keys) { + t.Errorf("[example %d] got %v, expected %v", i, out.Keys, example.Out) + } + } +} + func TestWriteWALSegment_UnmarshalBinary_DeleteWALCorrupt(t *testing.T) { w := &tsm1.DeleteWALEntry{ Keys: [][]byte{[]byte("foo"), []byte("bar")},