Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support negative timestamps for the query engine #7212

Merged
merged 1 commit into from
Aug 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#7605](https://github.com/influxdata/influxdb/issues/7605): Remove IF EXISTS/IF NOT EXISTS from influxql language.
- [#7095](https://github.com/influxdata/influxdb/pull/7095): Add MaxSeriesPerDatabase config setting.
- [#7199](https://github.com/influxdata/influxdb/pull/7199): Add mode function. Thanks @agaurav.
- [#7194](https://github.com/influxdata/influxdb/issues/7194): Support negative timestamps for the query engine.

### Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx
}
}
if opt.MinTime.IsZero() {
opt.MinTime = time.Unix(0, 0)
opt.MinTime = time.Unix(0, influxql.MinTime).UTC()
}

// Convert DISTINCT into a call.
Expand Down
8 changes: 5 additions & 3 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -3630,7 +3630,7 @@ func TimeRange(expr Expr) (min, max time.Time, err error) {
}

// TimeRangeAsEpochNano returns the minimum and maximum times, as epoch nano, specified by
// and expression. If there is no lower bound, the start of the epoch is returned
// an expression. If there is no lower bound, the minimum time is returned
// for minimum. If there is no higher bound, now is returned for maximum.
func TimeRangeAsEpochNano(expr Expr) (min, max int64, err error) {
tmin, tmax, err := TimeRange(expr)
Expand All @@ -3639,7 +3639,7 @@ func TimeRangeAsEpochNano(expr Expr) (min, max int64, err error) {
}

if tmin.IsZero() {
min = time.Unix(0, 0).UnixNano()
min = time.Unix(0, MinTime).UnixNano()
} else {
min = tmin.UnixNano()
}
Expand Down Expand Up @@ -3680,7 +3680,9 @@ func timeExprValue(ref Expr, lit Expr) (t time.Time, err error) {
case *TimeLiteral:
if lit.Val.After(time.Unix(0, MaxTime)) {
return time.Time{}, fmt.Errorf("time %s overflows time literal", lit.Val.Format(time.RFC3339))
} else if lit.Val.Before(time.Unix(0, MinTime)) {
} else if lit.Val.Before(time.Unix(0, MinTime+1)) {
// The minimum allowable time literal is one greater than the minimum time because the minimum time
// is a sentinel value only used internally.
return time.Time{}, fmt.Errorf("time %s underflows time literal", lit.Val.Format(time.RFC3339))
}
return lit.Val, nil
Expand Down
20 changes: 20 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,11 @@ func (itr *floatIntervalIterator) Next() (*FloatPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down Expand Up @@ -2977,6 +2982,11 @@ func (itr *integerIntervalIterator) Next() (*IntegerPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down Expand Up @@ -5135,6 +5145,11 @@ func (itr *stringIntervalIterator) Next() (*StringPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down Expand Up @@ -7293,6 +7308,11 @@ func (itr *booleanIntervalIterator) Next() (*BooleanPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down
5 changes: 5 additions & 0 deletions influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,11 @@ func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down
13 changes: 11 additions & 2 deletions influxql/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ var ErrUnknownCall = errors.New("unknown call")

const (
// MinTime is used as the minimum time value when computing an unbounded range.
MinTime = int64(0)
// This time is one less than the MinNanoTime so that the first minimum
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the max time copied exactly and the min time decreased by one? Can you add comment to clarify why this is being done?

// time can be used as a sentinel value to signify that it is the default
// value rather than explicitly set by the user.
MinTime = models.MinNanoTime - 1

// MaxTime is used as the maximum time value when computing an unbounded range.
// This time is 2262-04-11 23:47:16.854775806 +0000 UTC
Expand Down Expand Up @@ -925,7 +928,13 @@ func (opt IteratorOptions) Window(t int64) (start, end int64) {
t -= int64(opt.Interval.Offset)

// Truncate time by duration.
t -= t % int64(opt.Interval.Duration)
dt := t % int64(opt.Interval.Duration)
if dt < 0 {
// Negative modulo rounds up instead of down, so offset
// with the duration.
dt += int64(opt.Interval.Duration)
}
t -= dt

// Apply the offset.
start = t + int64(opt.Interval.Offset)
Expand Down
6 changes: 4 additions & 2 deletions influxql/point.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"encoding/binary"
"fmt"
"io"
"math"
"sort"

"github.com/gogo/protobuf/proto"
internal "github.com/influxdata/influxdb/influxql/internal"
)

// ZeroTime is the Unix nanosecond timestamp for time.Time{}.
const ZeroTime = int64(-6795364578871345152)
// ZeroTime is the Unix nanosecond timestamp for no time.
// This time is not used by the query engine or the storage engine as a valid time.
const ZeroTime = int64(math.MinInt64)

// Point represents a value in a series that occurred at a given time.
type Point interface {
Expand Down
2 changes: 1 addition & 1 deletion models/points_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ func TestParsePointMaxTimestamp(t *testing.T) {
}

func TestParsePointMinTimestamp(t *testing.T) {
test(t, `cpu value=1 -9223372036854775808`,
test(t, `cpu value=1 -9223372036854775806`,
NewTestPoint(
"cpu",
models.Tags{},
Expand Down
14 changes: 12 additions & 2 deletions models/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@ import (
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224192 +0000 UTC
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
MinNanoTime = int64(math.MinInt64)
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2

// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)

Expand Down
4 changes: 3 additions & 1 deletion tsdb/cursor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package tsdb

import "github.com/influxdata/influxdb/influxql"

// EOF represents a "not found" key returned by a Cursor.
const EOF = int64(-1)
const EOF = influxql.ZeroTime

// Cursor represents an iterator over a series.
type Cursor interface {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
// skip it.
if ascending && maxTime < t {
continue
// If we are descending and the min time fo the file is after where we want to start,
// If we are descending and the min time of the file is after where we want to start,
// then skip it.
} else if !ascending && minTime > t {
continue
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (d *indirectIndex) Entries(key string) []IndexEntry {
}

// The search may have returned an i == 0 which could indicated that the value
// searched should be inserted at postion 0. Make sure the key in the index
// searched should be inserted at position 0. Make sure the key in the index
// matches the search value.
if !bytes.Equal(kb, k) {
return nil
Expand Down