Skip to content

Commit

Permalink
Support negative timestamps in the query engine
Browse files Browse the repository at this point in the history
Negative timestamps are now supported. We also now refuse two
nanoseconds that are at the edge of the minimum time window. One of the
nanoseconds we do not accept is because we need MinInt64 to be used for
some internal comparisons in the TSM engine and it was causing an
underflow when we subtracted one from the minimum time. The second is so
we can have one minimum time that signifies the default minimum that
nobody can write to (so we can implicitly rewrite the timestamp on
aggregate queries) but still use the explicit timestamp if it is given
to us by the user. We aren't able to tell the difference between if the
user provided it or if it was implicit without those values being
different.

If the default minimum time is used with an aggregate query, we rewrite
the time to be the epoch for backwards compatibility since we believe
that's more important than supporting that extra nanosecond.
  • Loading branch information
jsternberg committed Aug 25, 2016
1 parent 5130cd7 commit 10029ca
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ With this release the systemd configuration files for InfluxDB will use the syst
- [#7605](https://github.com/influxdata/influxdb/issues/7605): Remove IF EXISTS/IF NOT EXISTS from influxql language.
- [#7095](https://github.com/influxdata/influxdb/pull/7095): Add MaxSeriesPerDatabase config setting.
- [#7199](https://github.com/influxdata/influxdb/pull/7199): Add mode function. Thanks @agaurav.
- [#7194](https://github.com/influxdata/influxdb/issues/7194): Support negative timestamps for the query engine.

### Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (e *StatementExecutor) createIterators(stmt *influxql.SelectStatement, ctx
}
}
if opt.MinTime.IsZero() {
opt.MinTime = time.Unix(0, 0)
opt.MinTime = time.Unix(0, influxql.MinTime).UTC()
}

// Convert DISTINCT into a call.
Expand Down
8 changes: 5 additions & 3 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -3630,7 +3630,7 @@ func TimeRange(expr Expr) (min, max time.Time, err error) {
}

// TimeRangeAsEpochNano returns the minimum and maximum times, as epoch nano, specified by
// and expression. If there is no lower bound, the start of the epoch is returned
// an expression. If there is no lower bound, the minimum time is returned
// for minimum. If there is no higher bound, now is returned for maximum.
func TimeRangeAsEpochNano(expr Expr) (min, max int64, err error) {
tmin, tmax, err := TimeRange(expr)
Expand All @@ -3639,7 +3639,7 @@ func TimeRangeAsEpochNano(expr Expr) (min, max int64, err error) {
}

if tmin.IsZero() {
min = time.Unix(0, 0).UnixNano()
min = time.Unix(0, MinTime).UnixNano()
} else {
min = tmin.UnixNano()
}
Expand Down Expand Up @@ -3680,7 +3680,9 @@ func timeExprValue(ref Expr, lit Expr) (t time.Time, err error) {
case *TimeLiteral:
if lit.Val.After(time.Unix(0, MaxTime)) {
return time.Time{}, fmt.Errorf("time %s overflows time literal", lit.Val.Format(time.RFC3339))
} else if lit.Val.Before(time.Unix(0, MinTime)) {
} else if lit.Val.Before(time.Unix(0, MinTime+1)) {
// The minimum allowable time literal is one greater than the minimum time because the minimum time
// is a sentinel value only used internally.
return time.Time{}, fmt.Errorf("time %s underflows time literal", lit.Val.Format(time.RFC3339))
}
return lit.Val, nil
Expand Down
20 changes: 20 additions & 0 deletions influxql/iterator.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,11 @@ func (itr *floatIntervalIterator) Next() (*FloatPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down Expand Up @@ -2977,6 +2982,11 @@ func (itr *integerIntervalIterator) Next() (*IntegerPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down Expand Up @@ -5135,6 +5145,11 @@ func (itr *stringIntervalIterator) Next() (*StringPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down Expand Up @@ -7293,6 +7308,11 @@ func (itr *booleanIntervalIterator) Next() (*BooleanPoint, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down
5 changes: 5 additions & 0 deletions influxql/iterator.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,11 @@ func (itr *{{$k.name}}IntervalIterator) Next() (*{{$k.Name}}Point, error) {
return nil, err
}
p.Time, _ = itr.opt.Window(p.Time)
// If we see the minimum allowable time, set the time to zero so we don't
// break the default returned time for aggregate queries without times.
if p.Time == MinTime {
p.Time = 0
}
return p, nil
}

Expand Down
13 changes: 11 additions & 2 deletions influxql/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ var ErrUnknownCall = errors.New("unknown call")

const (
// MinTime is used as the minimum time value when computing an unbounded range.
MinTime = int64(0)
// This time is one less than the MinNanoTime so that the first minimum
// time can be used as a sentinel value to signify that it is the default
// value rather than explicitly set by the user.
MinTime = models.MinNanoTime - 1

// MaxTime is used as the maximum time value when computing an unbounded range.
// This time is 2262-04-11 23:47:16.854775806 +0000 UTC
Expand Down Expand Up @@ -925,7 +928,13 @@ func (opt IteratorOptions) Window(t int64) (start, end int64) {
t -= int64(opt.Interval.Offset)

// Truncate time by duration.
t -= t % int64(opt.Interval.Duration)
dt := t % int64(opt.Interval.Duration)
if dt < 0 {
// Negative modulo rounds up instead of down, so offset
// with the duration.
dt += int64(opt.Interval.Duration)
}
t -= dt

// Apply the offset.
start = t + int64(opt.Interval.Offset)
Expand Down
6 changes: 4 additions & 2 deletions influxql/point.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ import (
"encoding/binary"
"fmt"
"io"
"math"
"sort"

"github.com/gogo/protobuf/proto"
internal "github.com/influxdata/influxdb/influxql/internal"
)

// ZeroTime is the Unix nanosecond timestamp for time.Time{}.
const ZeroTime = int64(-6795364578871345152)
// ZeroTime is the Unix nanosecond timestamp for no time.
// This time is not used by the query engine or the storage engine as a valid time.
const ZeroTime = int64(math.MinInt64)

// Point represents a value in a series that occurred at a given time.
type Point interface {
Expand Down
2 changes: 1 addition & 1 deletion models/points_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1225,7 +1225,7 @@ func TestParsePointMaxTimestamp(t *testing.T) {
}

func TestParsePointMinTimestamp(t *testing.T) {
test(t, `cpu value=1 -9223372036854775808`,
test(t, `cpu value=1 -9223372036854775806`,
NewTestPoint(
"cpu",
models.Tags{},
Expand Down
14 changes: 12 additions & 2 deletions models/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,24 @@ import (
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224192 +0000 UTC
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
MinNanoTime = int64(math.MinInt64)
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2

// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)

Expand Down
4 changes: 3 additions & 1 deletion tsdb/cursor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package tsdb

import "github.com/influxdata/influxdb/influxql"

// EOF represents a "not found" key returned by a Cursor.
const EOF = int64(-1)
const EOF = influxql.ZeroTime

// Cursor represents an iterator over a series.
type Cursor interface {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ func (f *FileStore) locations(key string, t int64, ascending bool) []*location {
// skip it.
if ascending && maxTime < t {
continue
// If we are descending and the min time fo the file is after where we want to start,
// If we are descending and the min time of the file is after where we want to start,
// then skip it.
} else if !ascending && minTime > t {
continue
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/tsm1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func (d *indirectIndex) Entries(key string) []IndexEntry {
}

// The search may have returned an i == 0 which could indicated that the value
// searched should be inserted at postion 0. Make sure the key in the index
// searched should be inserted at position 0. Make sure the key in the index
// matches the search value.
if !bytes.Equal(kb, k) {
return nil
Expand Down

0 comments on commit 10029ca

Please sign in to comment.