Skip to content

Commit

Permalink
Merge pull request #9391 from influxdata/check-series-file-key-tombst…
Browse files Browse the repository at this point in the history
…ones

Verify series key after lookup.
  • Loading branch information
benbjohnson authored Feb 5, 2018
2 parents 37374b2 + b09f350 commit 35b44cc
Showing 1 changed file with 63 additions and 34 deletions.
97 changes: 63 additions & 34 deletions tsdb/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,22 +99,30 @@ type seriesIteratorAdapter struct {
func (itr *seriesIteratorAdapter) Close() error { return itr.itr.Close() }

func (itr *seriesIteratorAdapter) Next() (SeriesElem, error) {
elem, err := itr.itr.Next()
if err != nil {
return nil, err
} else if elem.SeriesID == 0 {
return nil, nil
}
for {
elem, err := itr.itr.Next()
if err != nil {
return nil, err
} else if elem.SeriesID == 0 {
return nil, nil
}

// Skip if this key has been tombstoned.
key := itr.sfile.SeriesKey(elem.SeriesID)
if len(key) == 0 {
continue
}

name, tags := ParseSeriesKey(itr.sfile.SeriesKey(elem.SeriesID))
deleted := itr.sfile.IsDeleted(elem.SeriesID)
name, tags := ParseSeriesKey(key)
deleted := itr.sfile.IsDeleted(elem.SeriesID)

return &seriesElemAdapter{
name: name,
tags: tags,
deleted: deleted,
expr: elem.Expr,
}, nil
return &seriesElemAdapter{
name: name,
tags: tags,
deleted: deleted,
expr: elem.Expr,
}, nil
}
}

type seriesElemAdapter struct {
Expand Down Expand Up @@ -237,26 +245,34 @@ func (itr *seriesQueryAdapterIterator) Close() error {

// Next emits the next point in the iterator.
func (itr *seriesQueryAdapterIterator) Next() (*query.FloatPoint, error) {
// Read next series element.
e, err := itr.itr.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
return nil, nil
}
for {
// Read next series element.
e, err := itr.itr.Next()
if err != nil {
return nil, err
} else if e.SeriesID == 0 {
return nil, nil
}

// Skip if key has been tombstoned.
seriesKey := itr.sfile.SeriesKey(e.SeriesID)
if len(seriesKey) == 0 {
continue
}

// Convert to a key.
name, tags := ParseSeriesKey(itr.sfile.SeriesKey(e.SeriesID))
key := string(models.MakeKey(name, tags))
// Convert to a key.
name, tags := ParseSeriesKey(seriesKey)
key := string(models.MakeKey(name, tags))

// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
// Write auxiliary fields.
for i, f := range itr.opt.Aux {
switch f.Val {
case "key":
itr.point.Aux[i] = key
}
}
return &itr.point, nil
}
return &itr.point, nil
}

// filterUndeletedSeriesIDIterator returns all series which are not deleted.
Expand Down Expand Up @@ -744,7 +760,12 @@ func (itr *seriesPointIterator) readSeriesKeys(name []byte) error {
} else if elem.SeriesID == 0 {
break
}
itr.keys = append(itr.keys, itr.indexSet.SeriesFile.SeriesKey(elem.SeriesID))

key := itr.indexSet.SeriesFile.SeriesKey(elem.SeriesID)
if len(key) == 0 {
continue
}
itr.keys = append(itr.keys, key)
}

// Sort keys.
Expand Down Expand Up @@ -1719,7 +1740,9 @@ func (is IndexSet) MeasurementSeriesKeysByExpr(name []byte, expr influxql.Expr)
}

seriesKey := is.SeriesFile.SeriesKey(e.SeriesID)
assert(seriesKey != nil, fmt.Sprintf("series key for ID: %d not found", e.SeriesID))
if len(seriesKey) == 0 {
continue
}

name, tags := ParseSeriesKey(seriesKey)
keys = append(keys, models.MakeKey(name, tags))
Expand Down Expand Up @@ -2145,7 +2168,7 @@ func (is IndexSet) tagValuesByKeyAndExpr(auth query.Authorizer, name []byte, key
}

buf := is.SeriesFile.SeriesKey(e.SeriesID)
if buf == nil {
if len(buf) == 0 {
continue
}

Expand Down Expand Up @@ -2305,7 +2328,13 @@ func (is IndexSet) TagSets(sfile *SeriesFile, name []byte, opt query.IteratorOpt
break
}

_, tags := ParseSeriesKey(sfile.SeriesKey(e.SeriesID))
// Skip if the series has been tombstoned.
key := sfile.SeriesKey(e.SeriesID)
if len(key) == 0 {
continue
}

_, tags := ParseSeriesKey(key)
if opt.Authorizer != nil && !opt.Authorizer.AuthorizeSeriesRead(is.Database(), name, tags) {
continue
}
Expand Down

0 comments on commit 35b44cc

Please sign in to comment.