Skip to content

Commit

Permalink
Merge pull request #6556 from influxdata/jw-tsm-values
Browse files Browse the repository at this point in the history
Fix loading huge series into RAM when points are overwritten
  • Loading branch information
jwilder committed May 5, 2016
2 parents 178ed24 + a0ac754 commit fbf1e4a
Show file tree
Hide file tree
Showing 7 changed files with 1,541 additions and 104 deletions.
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (e *entry) count() int {
// filter removes all values between min and max inclusive
func (e *entry) filter(min, max int64) {
e.mu.Lock()
e.values = e.values.Filter(min, max)
e.values = e.values.Exclude(min, max)
e.mu.Unlock()
}

Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func (k *tsmKeyIterator) combine(dedup bool) blocks {

// Apply each tombstone to the block
for _, ts := range k.blocks[i].tombstones {
v = Values(v).Filter(ts.Min, ts.Max)
v = Values(v).Exclude(ts.Min, ts.Max)
}
decoded = append(decoded, v...)

Expand Down
250 changes: 245 additions & 5 deletions tsdb/engine/tsm1/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,41 @@ func (a Values) Size() int {
return sz
}

func (a Values) Filter(min, max int64) Values {
// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a Values) Merge(b Values) Values {
if len(a) == 0 {
return b
}

if len(b) == 0 {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
if av > bv {
a[i], b[j] = b[j], a[i]
} else if av == bv {
a[i] = b[j]
j++
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
}

return a
}

// Exclude returns the subset of values not in [min, max]
func (a Values) Exclude(min, max int64) Values {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
Expand All @@ -99,6 +133,20 @@ func (a Values) Filter(min, max int64) Values {
return a[:i]
}

// Include returns the subset values between min and max inclusive.
func (a Values) Include(min, max int64) Values {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() < min || a[j].UnixNano() > max {
continue
}

a[i] = a[j]
i++
}
return a[:i]
}

// Encode converts the values to a byte slice. If there are no values,
// this function panics.
func (a Values) Encode(buf []byte) ([]byte, error) {
Expand Down Expand Up @@ -367,7 +415,41 @@ func (a FloatValues) Deduplicate() FloatValues {
return other
}

func (a FloatValues) Filter(min, max int64) FloatValues {
// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a FloatValues) Merge(b FloatValues) FloatValues {
if len(a) == 0 {
return b
}

if len(b) == 0 {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
if av > bv {
a[i], b[j] = b[j], a[i]
} else if av == bv {
a[i] = b[j]
j++
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
}

return a
}

// Exclude returns the subset of values not in [min, max]
func (a FloatValues) Exclude(min, max int64) FloatValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
Expand All @@ -380,6 +462,20 @@ func (a FloatValues) Filter(min, max int64) FloatValues {
return a[:i]
}

// Include returns the subset values between min and max inclusive.
func (a FloatValues) Include(min, max int64) FloatValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() < min || a[j].UnixNano() > max {
continue
}

a[i] = a[j]
i++
}
return a[:i]
}

// Sort methods
func (a FloatValues) Len() int { return len(a) }
func (a FloatValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
Expand Down Expand Up @@ -504,7 +600,8 @@ func (a BooleanValues) Deduplicate() BooleanValues {
return other
}

func (a BooleanValues) Filter(min, max int64) BooleanValues {
// Exclude returns the subset of values not in [min, max]
func (a BooleanValues) Exclude(min, max int64) BooleanValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
Expand All @@ -517,6 +614,53 @@ func (a BooleanValues) Filter(min, max int64) BooleanValues {
return a[:i]
}

// Include returns the subset values between min and max inclusive.
func (a BooleanValues) Include(min, max int64) BooleanValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() < min || a[j].UnixNano() > max {
continue
}

a[i] = a[j]
i++
}
return a[:i]
}

// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a BooleanValues) Merge(b BooleanValues) BooleanValues {
if len(a) == 0 {
return b
}

if len(b) == 0 {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
if av > bv {
a[i], b[j] = b[j], a[i]
} else if av == bv {
a[i] = b[j]
j++
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
}

return a
}

// Sort methods
func (a BooleanValues) Len() int { return len(a) }
func (a BooleanValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
Expand Down Expand Up @@ -629,7 +773,8 @@ func (a IntegerValues) Deduplicate() IntegerValues {
return other
}

func (a IntegerValues) Filter(min, max int64) IntegerValues {
// Exclude returns the subset of values not in [min, max]
func (a IntegerValues) Exclude(min, max int64) IntegerValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
Expand All @@ -642,6 +787,53 @@ func (a IntegerValues) Filter(min, max int64) IntegerValues {
return a[:i]
}

// Include returns the subset values between min and max inclusive.
func (a IntegerValues) Include(min, max int64) IntegerValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() < min || a[j].UnixNano() > max {
continue
}

a[i] = a[j]
i++
}
return a[:i]
}

// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a IntegerValues) Merge(b IntegerValues) IntegerValues {
if len(a) == 0 {
return b
}

if len(b) == 0 {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
if av > bv {
a[i], b[j] = b[j], a[i]
} else if av == bv {
a[i] = b[j]
j++
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
}

return a
}

// Sort methods
func (a IntegerValues) Len() int { return len(a) }
func (a IntegerValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
Expand Down Expand Up @@ -756,7 +948,8 @@ func (a StringValues) Deduplicate() StringValues {
return other
}

func (a StringValues) Filter(min, max int64) StringValues {
// Exclude returns the subset of values not in [min, max]
func (a StringValues) Exclude(min, max int64) StringValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() >= min && a[j].UnixNano() <= max {
Expand All @@ -769,6 +962,53 @@ func (a StringValues) Filter(min, max int64) StringValues {
return a[:i]
}

// Include returns the subset values between min and max inclusive.
func (a StringValues) Include(min, max int64) StringValues {
var i int
for j := 0; j < len(a); j++ {
if a[j].UnixNano() < min || a[j].UnixNano() > max {
continue
}

a[i] = a[j]
i++
}
return a[:i]
}

// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a StringValues) Merge(b StringValues) StringValues {
if len(a) == 0 {
return b
}

if len(b) == 0 {
return a
}

var i, j int
for ; i < len(a) && j < len(b); i++ {
av, bv := a[i].UnixNano(), b[j].UnixNano()
if av > bv {
a[i], b[j] = b[j], a[i]
} else if av == bv {
a[i] = b[j]
j++
}
}

if i >= len(a) {
if j+1 < len(b) && b[j].UnixNano() == b[j+1].UnixNano() {
j++
}
return append(a, b[j:]...)
}

return a
}

// Sort methods
func (a StringValues) Len() int { return len(a) }
func (a StringValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
Expand Down
Loading

0 comments on commit fbf1e4a

Please sign in to comment.