Skip to content

Commit

Permalink
fix: copy names from mmapped memory before closing iterator (#22040) (#…
Browse files Browse the repository at this point in the history
…22059)

This fix ensures that memory-mapped files are not released
before pointers into them are copied into heap memory.
MeasurementNamesByExpr() and MeasurementNamesByPredicate() can
cause panics by copying memory from mmapped files that have been
released. The functions they call use iterators to files which
are closed (releasing the mmapped files) before the memory is
safely copied to the heap.

closes #22000

(cherry picked from commit a989f8f)

closes #22002
  • Loading branch information
davidby-influx authored Aug 4, 2021
1 parent 2339432 commit 16b737d
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21946](https://github.com/influxdata/influxdb/pull/21946): Prevent silently dropped writes when there are overlapping shards.
1. [21950](https://github.com/influxdata/influxdb/pull/21950): Invalid requests to /api/v2 subroutes now return 404 instead of a list of links.
1. [21962](https://github.com/influxdata/influxdb/pull/21962): Flux metaqueries for `_field` take fast path if `_measurement` is the only predicate.
1. [22059](https://github.com/influxdata/influxdb/pull/22059): Copy names from mmapped memory before closing iterator

## v2.0.7 [2021-06-04]

Expand Down
123 changes: 98 additions & 25 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,11 @@ func (a MeasurementIterators) Close() (err error) {
return err
}

type MeasurementSliceIterator interface {
MeasurementIterator
UnderlyingSlice() [][]byte
}

type measurementSliceIterator struct {
names [][]byte
}
Expand All @@ -981,6 +986,37 @@ func (itr *measurementSliceIterator) Next() (name []byte, err error) {
return name, nil
}

func (itr *measurementSliceIterator) UnderlyingSlice() [][]byte {
return itr.names
}

// fileMeasurementSliceIterator is designed to allow a tag value slice
// iterator to use memory from a memory-mapped file, pinning it
// with the underlying file iterators
type fileMeasurementSliceIterator struct {
measurementSliceIterator
fileIterators MeasurementIterators
}

func (itr *fileMeasurementSliceIterator) Close() error {
e1 := itr.fileIterators.Close()
e2 := itr.measurementSliceIterator.Close()
if e1 != nil {
return e1
} else {
return e2
}
}

func newFileMeasurementSliceIterator(names [][]byte, itrs MeasurementIterators) *fileMeasurementSliceIterator {
return &fileMeasurementSliceIterator{
measurementSliceIterator: measurementSliceIterator{
names: names,
},
fileIterators: itrs,
}
}

// MergeMeasurementIterators returns an iterator that merges a set of iterators.
// Iterators that are first in the list take precedence and a deletion by those
// early iterators will invalidate elements by later iterators.
Expand Down Expand Up @@ -1307,11 +1343,18 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.E

// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByExpr(auth, expr)
itr, err := is.measurementNamesByExpr(auth, expr)
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
return slices.CopyChunkedByteSlices(names, 1000), nil
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()
return slices.CopyChunkedByteSlices(itr.UnderlyingSlice(), 1000), nil
}

itr, err := is.measurementIterator()
Expand All @@ -1320,10 +1363,14 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.E
} else if itr == nil {
return nil, nil
}
defer itr.Close()
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()

// Iterate over all measurements if no condition exists.
var names [][]byte
// Iterate over all measurements if no condition exists.
for {
e, err := itr.Next()
if err != nil {
Expand All @@ -1341,7 +1388,7 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.Authorizer, expr influxql.E
return slices.CopyChunkedByteSlices(names, 1000), nil
}

func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.Expr) (MeasurementSliceIterator, error) {
if expr == nil {
return nil, nil
}
Expand Down Expand Up @@ -1381,20 +1428,22 @@ func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.E
return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex)

case influxql.OR, influxql.AND:

lhs, err := is.measurementNamesByExpr(auth, e.LHS)
if err != nil {
return nil, err
}

rhs, err := is.measurementNamesByExpr(auth, e.RHS)
if err != nil {
lhs.Close()
return nil, err
}

mis := MeasurementIterators{lhs, rhs}
if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Union(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil
}
return bytesutil.Intersect(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Intersect(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil

default:
return nil, fmt.Errorf("invalid tag comparison operator")
Expand All @@ -1408,19 +1457,19 @@ func (is IndexSet) measurementNamesByExpr(auth query.Authorizer, expr influxql.E
}

// measurementNamesByNameFilter returns matching measurement names in sorted order.
func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influxql.Token, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
itr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if itr == nil {
return nil, nil
}
defer itr.Close()

var names [][]byte
for {
e, err := itr.Next()
if err != nil {
itr.Close()
return nil, err
} else if e == nil {
break
Expand All @@ -1443,7 +1492,7 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.Authorizer, op influx
}
}
bytesutil.Sort(names)
return names, nil
return newFileMeasurementSliceIterator(names, MeasurementIterators{itr}), nil
}

// MeasurementNamesByPredicate returns a slice of measurement names matching the
Expand All @@ -1456,11 +1505,18 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influ

// Return filtered list if expression exists.
if expr != nil {
names, err := is.measurementNamesByPredicate(auth, expr)
itr, err := is.measurementNamesByPredicate(auth, expr)
if err != nil {
return nil, err
}
return slices.CopyChunkedByteSlices(names, 1000), nil
if itr != nil {
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()
}
return slices.CopyChunkedByteSlices(itr.UnderlyingSlice(), 1000), nil
}

itr, err := is.measurementIterator()
Expand All @@ -1469,10 +1525,14 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influ
} else if itr == nil {
return nil, nil
}
defer itr.Close()
defer func() {
if e := itr.Close(); err == nil {
err = e
}
}()

// Iterate over all measurements if no condition exists.
var names [][]byte
// Iterate over all measurements if no condition exists.
for {
e, err := itr.Next()
if err != nil {
Expand All @@ -1490,7 +1550,7 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.Authorizer, expr influ
return slices.CopyChunkedByteSlices(names, 1000), nil
}

func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) ([][]byte, error) {
func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influxql.Expr) (MeasurementSliceIterator, error) {
if expr == nil {
return nil, nil
}
Expand Down Expand Up @@ -1534,16 +1594,17 @@ func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influ
if err != nil {
return nil, err
}

rhs, err := is.measurementNamesByPredicate(auth, e.RHS)
if err != nil {
lhs.Close()
return nil, err
}
mis := MeasurementIterators{lhs, rhs}

if e.Op == influxql.OR {
return bytesutil.Union(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Union(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil
}
return bytesutil.Intersect(lhs, rhs), nil
return newFileMeasurementSliceIterator(bytesutil.Intersect(lhs.UnderlyingSlice(), rhs.UnderlyingSlice()), mis), nil

default:
return nil, fmt.Errorf("invalid tag comparison operator")
Expand All @@ -1556,16 +1617,21 @@ func (is IndexSet) measurementNamesByPredicate(auth query.Authorizer, expr influ
}
}

func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
var names [][]byte
failed := true

mitr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
defer func() {
if failed {
mitr.Close()
}
}()

// valEqual determines if the provided []byte is equal to the tag value
// to be filtered on.
Expand Down Expand Up @@ -1680,19 +1746,25 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.Authorizer, op influxq
}

bytesutil.Sort(names)
return names, nil
failed = false
return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil
}

func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) {
func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementSliceIterator, error) {
var names [][]byte
failed := true

mitr, err := is.measurementIterator()
if err != nil {
return nil, err
} else if mitr == nil {
return nil, nil
}
defer mitr.Close()
defer func() {
if failed {
mitr.Close()
}
}()

var checkMeasurement func(auth query.Authorizer, me []byte) (bool, error)
switch op {
Expand Down Expand Up @@ -1743,7 +1815,8 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.Authorizer, op infl
}

bytesutil.Sort(names)
return names, nil
failed = false
return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil
}

// measurementAuthorizedSeries determines if the measurement contains a series
Expand Down

0 comments on commit 16b737d

Please sign in to comment.