Skip to content

Commit

Permalink
Merge pull request #1773 from influxdb/race-condition-when-merging-mu…
Browse files Browse the repository at this point in the history
…ltiple-series

Fix problem with merging series that have unequal number of points in gr...
  • Loading branch information
pauldix committed Feb 27, 2015
2 parents 77d0120 + 889f0a3 commit 48294ce
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [#1752](https://github.com/influxdb/influxdb/pull/1752): remove debug log output from collectd.
- [#1720](https://github.com/influxdb/influxdb/pull/1720): Parse Series IDs as unsigned 32-bits.
- [#1767](https://github.com/influxdb/influxdb/pull/1767): Drop Series was failing across shards. Issue #1761.
- [#1773](https://github.com/influxdb/influxdb/pull/1773): Fix bug when merging series together that have unequal number of points in a group by interval

### Features

Expand Down
10 changes: 7 additions & 3 deletions influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewPlanner(db DB) *Planner {

// Plan creates an execution plan for the given SelectStatement and returns an Executor.
func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
now := p.Now()
now := p.Now().UTC()

// Clone the statement to be planned.
// Replace instances of "now()" with the current time.
Expand Down Expand Up @@ -698,7 +698,9 @@ func MapMean(itr Iterator, e *Emitter, tmin int64) {
out.Count++
out.Sum += v.(float64)
}
e.Emit(Key{tmin, itr.Tags()}, out)
if out.Count > 0 {
e.Emit(Key{tmin, itr.Tags()}, out)
}
}

type meanMapOutput struct {
Expand All @@ -714,7 +716,9 @@ func ReduceMean(key Key, values []interface{}, e *Emitter) {
out.Count += val.Count
out.Sum += val.Sum
}
e.Emit(key, out.Sum/float64(out.Count))
if out.Count > 0 {
e.Emit(key, out.Sum/float64(out.Count))
}
}

// MapMin collects the values to pass to the reducer
Expand Down
31 changes: 31 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,37 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) {
}
}

// Ensure that when merging many series together and some of them have a different number of points than others
// in a group by interval the results are correct
func TestServer_MergeManySeries(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")

for i := 1; i < 11; i++ {
for j := 1; j < 5+i%3; j++ {
tags := map[string]string{"host": fmt.Sprintf("server_%d", i)}
if index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: time.Unix(int64(j), int64(0)), Fields: map[string]interface{}{"value": float64(22)}}}); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
}
}

results := s.ExecuteQuery(MustParseQuery(`select count(value) from cpu group by time(1s)`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:01Z",10],["1970-01-01T00:00:02Z",10],["1970-01-01T00:00:03Z",10],["1970-01-01T00:00:04Z",10],["1970-01-01T00:00:05Z",7],["1970-01-01T00:00:06Z",3]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}

// Ensure Drop Series can:
// write to measurement cpu with tags region=uswest host=serverA
// write to measurement cpu with tags region=uswest host=serverB
Expand Down
5 changes: 4 additions & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package influxdb

import (
"fmt"
"math"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -309,10 +310,12 @@ func (i *shardIterator) Tags() string { return i.tags }

func (i *shardIterator) Next() (key int64, data []byte, value interface{}) {
min := -1
minKey := int64(math.MaxInt64)

for ind, kv := range i.keyValues {
if kv.key != 0 && kv.key < i.tmax {
if kv.key != 0 && kv.key < i.tmax && kv.key < minKey {
min = ind
minKey = kv.key
}
}

Expand Down

0 comments on commit 48294ce

Please sign in to comment.