-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: copy names from mmapped memory before closing iterator #22040
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -938,6 +938,26 @@ type MeasurementIterator interface { | |
Next() ([]byte, error) | ||
} | ||
|
||
func ToSlice(itr MeasurementIterator) ([][]byte, error) { | ||
if itr == nil { | ||
return nil, nil | ||
} else if iterator, ok := itr.(*measurementSliceIterator); ok { | ||
return iterator.ToSlice(), nil | ||
} else if iterator, ok := itr.(*fileMeasurementSliceIterator); ok { | ||
return iterator.measurementSliceIterator.ToSlice(), nil | ||
} | ||
s := make([][]byte, 0) | ||
for { | ||
if value, err := itr.Next(); err != nil { | ||
return nil, err | ||
} else if value == nil { | ||
return nil, nil | ||
} else { | ||
s = append(s, value) | ||
} | ||
} | ||
} | ||
|
||
type MeasurementIterators []MeasurementIterator | ||
|
||
func (a MeasurementIterators) Close() (err error) { | ||
|
@@ -968,6 +988,46 @@ func (itr *measurementSliceIterator) Next() (name []byte, err error) { | |
return name, nil | ||
} | ||
|
||
func (itr *measurementSliceIterator) ToSlice() [][]byte { | ||
return itr.names | ||
} | ||
|
||
// fileMeasurementSliceIterator is designed to allow a tag value slice | ||
// iterator to use memory from a memory-mapped file, pinning it | ||
// with the underlying file iterators | ||
type fileMeasurementSliceIterator struct { | ||
measurementSliceIterator | ||
fileIterators MeasurementIterators | ||
} | ||
|
||
func (itr *fileMeasurementSliceIterator) Close() error { | ||
e1 := itr.fileIterators.Close() | ||
e2 := itr.measurementSliceIterator.Close() | ||
if e1 != nil { | ||
return e1 | ||
} else { | ||
return e2 | ||
} | ||
} | ||
|
||
func newFileMeasurementSliceIterator(names [][]byte, itrs MeasurementIterators) *fileMeasurementSliceIterator { | ||
return &fileMeasurementSliceIterator{ | ||
measurementSliceIterator: measurementSliceIterator{ | ||
names: names, | ||
}, | ||
fileIterators: itrs, | ||
} | ||
} | ||
|
||
func measurementIteratorToSliceIterator(iterator MeasurementIterator) ([][]byte, *fileMeasurementSliceIterator, error) { | ||
s, err := ToSlice(iterator) | ||
if err != nil { | ||
return nil, nil, err | ||
} else { | ||
return s, newFileMeasurementSliceIterator(s, MeasurementIterators{iterator}), nil | ||
} | ||
} | ||
|
||
// MergeMeasurementIterators returns an iterator that merges a set of iterators. | ||
// Iterators that are first in the list take precedence and a deletion by those | ||
// early iterators will invalidate elements by later iterators. | ||
|
@@ -1320,17 +1380,29 @@ func (is IndexSet) DedupeInmemIndexes() IndexSet { | |
|
||
// MeasurementNamesByExpr returns a slice of measurement names matching the | ||
// provided condition. If no condition is provided then all names are returned. | ||
func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { | ||
func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) (names [][]byte, err error) { | ||
release := is.SeriesFile.Retain() | ||
defer release() | ||
|
||
// Return filtered list if expression exists. | ||
if expr != nil { | ||
names, err := is.measurementNamesByExpr(auth, expr) | ||
itr, err := is.measurementNamesByExpr(auth, expr) | ||
if err != nil { | ||
return nil, err | ||
} else if itr == nil { | ||
return nil, nil | ||
} | ||
defer func() { | ||
if e := itr.Close(); err == nil { | ||
err = e | ||
} | ||
}() | ||
|
||
if names, err := ToSlice(itr); err != nil { | ||
return nil, err | ||
} else { | ||
return slices.CopyChunkedByteSlices(names, 1000), nil | ||
} | ||
return slices.CopyChunkedByteSlices(names, 1000), nil | ||
} | ||
|
||
itr, err := is.measurementIterator() | ||
|
@@ -1339,10 +1411,13 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influx | |
} else if itr == nil { | ||
return nil, nil | ||
} | ||
defer itr.Close() | ||
defer func() { | ||
if e := itr.Close(); err == nil { | ||
err = e | ||
} | ||
}() | ||
|
||
// Iterate over all measurements if no condition exists. | ||
var names [][]byte | ||
for { | ||
e, err := itr.Next() | ||
if err != nil { | ||
|
@@ -1360,7 +1435,7 @@ func (is IndexSet) MeasurementNamesByExpr(auth query.FineAuthorizer, expr influx | |
return slices.CopyChunkedByteSlices(names, 1000), nil | ||
} | ||
|
||
func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { | ||
func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influxql.Expr) (MeasurementIterator, error) { | ||
if expr == nil { | ||
return nil, nil | ||
} | ||
|
@@ -1400,20 +1475,35 @@ func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influx | |
return is.measurementNamesByTagFilter(auth, e.Op, tag.Val, value, regex) | ||
|
||
case influxql.OR, influxql.AND: | ||
lhs, err := is.measurementNamesByExpr(auth, e.LHS) | ||
|
||
exprToSlice := func(e influxql.Expr) (slice [][]byte, iterator *fileMeasurementSliceIterator, err error) { | ||
mi, err := is.measurementNamesByExpr(auth, e) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
slice, iterator, err = measurementIteratorToSliceIterator(mi) | ||
if err != nil { | ||
mi.Close() | ||
return nil, nil, err | ||
} | ||
return | ||
} | ||
|
||
ls, lsfi, err := exprToSlice(e.LHS) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because ToSlice() can fail on none-slice-based iterators. |
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
rhs, err := is.measurementNamesByExpr(auth, e.RHS) | ||
rs, rsfi, err := exprToSlice(e.RHS) | ||
if err != nil { | ||
lsfi.Close() | ||
return nil, err | ||
} | ||
|
||
mis := MeasurementIterators{lsfi, rsfi} | ||
if e.Op == influxql.OR { | ||
return bytesutil.Union(lhs, rhs), nil | ||
return newFileMeasurementSliceIterator(bytesutil.Union(ls, rs), mis), nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly it might be clearer if this read like:
Also would it be clearer to name |
||
} | ||
return bytesutil.Intersect(lhs, rhs), nil | ||
return newFileMeasurementSliceIterator(bytesutil.Intersect(ls, rs), mis), nil | ||
|
||
default: | ||
return nil, fmt.Errorf("invalid tag comparison operator") | ||
|
@@ -1427,19 +1517,19 @@ func (is IndexSet) measurementNamesByExpr(auth query.FineAuthorizer, expr influx | |
} | ||
|
||
// measurementNamesByNameFilter returns matching measurement names in sorted order. | ||
func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) ([][]byte, error) { | ||
func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op influxql.Token, val string, regex *regexp.Regexp) (MeasurementIterator, error) { | ||
itr, err := is.measurementIterator() | ||
if err != nil { | ||
return nil, err | ||
} else if itr == nil { | ||
return nil, nil | ||
} | ||
defer itr.Close() | ||
|
||
var names [][]byte | ||
for { | ||
e, err := itr.Next() | ||
if err != nil { | ||
itr.Close() | ||
return nil, err | ||
} else if e == nil { | ||
break | ||
|
@@ -1462,23 +1552,33 @@ func (is IndexSet) measurementNamesByNameFilter(auth query.FineAuthorizer, op in | |
} | ||
} | ||
bytesutil.Sort(names) | ||
return names, nil | ||
return newFileMeasurementSliceIterator(names, MeasurementIterators{itr}), nil | ||
} | ||
|
||
// MeasurementNamesByPredicate returns a slice of measurement names matching the | ||
// provided condition. If no condition is provided then all names are returned. | ||
// This behaves differently from MeasurementNamesByExpr because it will | ||
// return measurements using flux predicates. | ||
func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { | ||
func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) (names [][]byte, err error) { | ||
release := is.SeriesFile.Retain() | ||
defer release() | ||
|
||
// Return filtered list if expression exists. | ||
if expr != nil { | ||
names, err := is.measurementNamesByPredicate(auth, expr) | ||
itr, err := is.measurementNamesByPredicate(auth, expr) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if itr != nil { | ||
defer func() { | ||
if e := itr.Close(); err == nil { | ||
err = e | ||
} | ||
}() | ||
} | ||
if names, err = ToSlice(itr); err != nil { | ||
return nil, err | ||
} | ||
return slices.CopyChunkedByteSlices(names, 1000), nil | ||
} | ||
|
||
|
@@ -1488,10 +1588,13 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr i | |
} else if itr == nil { | ||
return nil, nil | ||
} | ||
defer itr.Close() | ||
defer func() { | ||
if e := itr.Close(); err == nil { | ||
err = e | ||
} | ||
}() | ||
|
||
// Iterate over all measurements if no condition exists. | ||
var names [][]byte | ||
for { | ||
e, err := itr.Next() | ||
if err != nil { | ||
|
@@ -1509,7 +1612,7 @@ func (is IndexSet) MeasurementNamesByPredicate(auth query.FineAuthorizer, expr i | |
return slices.CopyChunkedByteSlices(names, 1000), nil | ||
} | ||
|
||
func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) ([][]byte, error) { | ||
func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr influxql.Expr) (MeasurementIterator, error) { | ||
if expr == nil { | ||
return nil, nil | ||
} | ||
|
@@ -1549,20 +1652,35 @@ func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr i | |
return is.measurementNamesByTagPredicate(auth, e.Op, tag.Val, value, regex) | ||
|
||
case influxql.OR, influxql.AND: | ||
lhs, err := is.measurementNamesByPredicate(auth, e.LHS) | ||
predToSlice := func(e influxql.Expr) (slice [][]byte, iterator *fileMeasurementSliceIterator, err error) { | ||
mi, err := is.measurementNamesByPredicate(auth, e) | ||
if err != nil { | ||
return nil, nil, err | ||
} | ||
slice, iterator, err = measurementIteratorToSliceIterator(mi) | ||
if err != nil { | ||
mi.Close() | ||
return nil, nil, err | ||
} | ||
return | ||
} | ||
|
||
ls, litr, err := predToSlice(e.LHS) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
rhs, err := is.measurementNamesByPredicate(auth, e.RHS) | ||
rs, ritr, err := predToSlice(e.RHS) | ||
if err != nil { | ||
litr.Close() | ||
return nil, err | ||
} | ||
|
||
mis := MeasurementIterators{litr, ritr} | ||
|
||
if e.Op == influxql.OR { | ||
return bytesutil.Union(lhs, rhs), nil | ||
return newFileMeasurementSliceIterator(bytesutil.Union(ls, rs), mis), nil | ||
} | ||
return bytesutil.Intersect(lhs, rhs), nil | ||
return newFileMeasurementSliceIterator(bytesutil.Intersect(ls, rs), mis), nil | ||
|
||
default: | ||
return nil, fmt.Errorf("invalid tag comparison operator") | ||
|
@@ -1575,16 +1693,21 @@ func (is IndexSet) measurementNamesByPredicate(auth query.FineAuthorizer, expr i | |
} | ||
} | ||
|
||
func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { | ||
func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementIterator, error) { | ||
var names [][]byte | ||
failed := true | ||
|
||
mitr, err := is.measurementIterator() | ||
if err != nil { | ||
return nil, err | ||
} else if mitr == nil { | ||
return nil, nil | ||
} | ||
defer mitr.Close() | ||
defer func() { | ||
if failed { | ||
mitr.Close() | ||
} | ||
}() | ||
|
||
// valEqual determines if the provided []byte is equal to the tag value | ||
// to be filtered on. | ||
|
@@ -1699,19 +1822,25 @@ func (is IndexSet) measurementNamesByTagFilter(auth query.FineAuthorizer, op inf | |
} | ||
|
||
bytesutil.Sort(names) | ||
return names, nil | ||
failed = false | ||
return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil | ||
} | ||
|
||
func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) ([][]byte, error) { | ||
func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op influxql.Token, key, val string, regex *regexp.Regexp) (MeasurementIterator, error) { | ||
var names [][]byte | ||
failed := true | ||
|
||
mitr, err := is.measurementIterator() | ||
if err != nil { | ||
return nil, err | ||
} else if mitr == nil { | ||
return nil, nil | ||
} | ||
defer mitr.Close() | ||
defer func() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've recently seen a pattern using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. E.g.:
So you're guaranteed just one of either the close or the delegation of the close succeeds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment doesn't need to be taken for the PR, just an observation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this pattern won't work here, because the initial There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm - I think it would work, here's a playground example. But not necessary here. https://play.golang.org/p/35qO7UPTOyG There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I see; Once works differently than a similar construct I used in another language; my mistake. |
||
if failed { | ||
mitr.Close() | ||
} | ||
}() | ||
|
||
var checkMeasurement func(auth query.FineAuthorizer, me []byte) (bool, error) | ||
switch op { | ||
|
@@ -1762,7 +1891,8 @@ func (is IndexSet) measurementNamesByTagPredicate(auth query.FineAuthorizer, op | |
} | ||
|
||
bytesutil.Sort(names) | ||
return names, nil | ||
failed = false | ||
return newFileMeasurementSliceIterator(names, MeasurementIterators{mitr}), nil | ||
} | ||
|
||
// measurementAuthorizedSeries determines if the measurement contains a series | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having both
ToSlice
andNext
on the same object seems prone to accident...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if it was named
RemainingAsSlice
or something that suggests it is part of the iteration interfaceThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnderlyingSlice() is the new name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, UnderlyingSlice still hints to me that the slice is the whole thing instead of eaten away at by
Next
, but ok.