Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix problem with merging series that have unequal number of points in gr... #1773

Merged
merged 1 commit into from
Feb 27, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- [#1752](https://github.com/influxdb/influxdb/pull/1752): remove debug log output from collectd.
- [#1720](https://github.com/influxdb/influxdb/pull/1720): Parse Series IDs as unsigned 32-bits.
- [#1767](https://github.com/influxdb/influxdb/pull/1767): Drop Series was failing across shards. Issue #1761.
- [#1773](https://github.com/influxdb/influxdb/pull/1773): Fix bug when merging series together that have unequal number of points in a group by interval

### Features

Expand Down
10 changes: 7 additions & 3 deletions influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewPlanner(db DB) *Planner {

// Plan creates an execution plan for the given SelectStatement and returns an Executor.
func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
now := p.Now()
now := p.Now().UTC()

// Clone the statement to be planned.
// Replace instances of "now()" with the current time.
Expand Down Expand Up @@ -698,7 +698,9 @@ func MapMean(itr Iterator, e *Emitter, tmin int64) {
out.Count++
out.Sum += v.(float64)
}
e.Emit(Key{tmin, itr.Tags()}, out)
if out.Count > 0 {
e.Emit(Key{tmin, itr.Tags()}, out)
}
}

type meanMapOutput struct {
Expand All @@ -714,7 +716,9 @@ func ReduceMean(key Key, values []interface{}, e *Emitter) {
out.Count += val.Count
out.Sum += val.Sum
}
e.Emit(key, out.Sum/float64(out.Count))
if out.Count > 0 {
e.Emit(key, out.Sum/float64(out.Count))
}
}

// MapMin collects the values to pass to the reducer
Expand Down
31 changes: 31 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,37 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) {
}
}

// Ensure that when merging many series together and some of them have a different number of points than others
// in a group by interval the results are correct
func TestServer_MergeManySeries(t *testing.T) {
c := NewMessagingClient()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")

for i := 1; i < 11; i++ {
for j := 1; j < 5+i%3; j++ {
tags := map[string]string{"host": fmt.Sprintf("server_%d", i)}
if index, err := s.WriteSeries("foo", "raw", []influxdb.Point{{Name: "cpu", Tags: tags, Timestamp: time.Unix(int64(j), int64(0)), Fields: map[string]interface{}{"value": float64(22)}}}); err != nil {
t.Fatalf("unexpected error: %s", err.Error())
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
}
}

results := s.ExecuteQuery(MustParseQuery(`select count(value) from cpu group by time(1s)`), "foo", nil)
if res := results.Results[0]; res.Err != nil {
t.Fatalf("unexpected error: %s", res.Err)
} else if len(res.Series) != 1 {
t.Fatalf("unexpected row count: %d", len(res.Series))
} else if s := mustMarshalJSON(res); s != `{"series":[{"name":"cpu","columns":["time","count"],"values":[["1970-01-01T00:00:01Z",10],["1970-01-01T00:00:02Z",10],["1970-01-01T00:00:03Z",10],["1970-01-01T00:00:04Z",10],["1970-01-01T00:00:05Z",7],["1970-01-01T00:00:06Z",3]]}]}` {
t.Fatalf("unexpected row(0): %s", s)
}
}

// Ensure Drop Series can:
// write to measurement cpu with tags region=uswest host=serverA
// write to measurement cpu with tags region=uswest host=serverB
Expand Down
5 changes: 4 additions & 1 deletion tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package influxdb

import (
"fmt"
"math"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -309,10 +310,12 @@ func (i *shardIterator) Tags() string { return i.tags }

func (i *shardIterator) Next() (key int64, data []byte, value interface{}) {
min := -1
minKey := int64(math.MaxInt64)

for ind, kv := range i.keyValues {
if kv.key != 0 && kv.key < i.tmax {
if kv.key != 0 && kv.key < i.tmax && kv.key < minKey {
min = ind
minKey = kv.key
}
}

Expand Down