diff --git a/src/dbnode/encoding/m3tsz/encoder_test.go b/src/dbnode/encoding/m3tsz/encoder_test.go index 7b85b790b7..388855aecc 100644 --- a/src/dbnode/encoding/m3tsz/encoder_test.go +++ b/src/dbnode/encoding/m3tsz/encoder_test.go @@ -539,3 +539,169 @@ func testMultiplePasses(t *testing.T, test multiplePassesTest) { } } } + +func TestEncoderFailOnDeltaOfDeltaOverflow(t *testing.T) { + tests := []struct { + name string + delta time.Duration + units xtime.Unit + positiveErrMsg string + negativeErrMsg string + }{ + { + name: "seconds, short gap", + delta: time.Hour, + units: xtime.Second, + }, + { + name: "seconds, huge gap", + delta: 25 * 24 * time.Hour, + units: xtime.Second, + }, + { + name: "seconds, too big gap", + delta: 1000 * 25 * 24 * time.Hour, // more than 2^31 s + units: xtime.Second, + positiveErrMsg: "deltaOfDelta value 2160000000 s overflows 32 bits", + negativeErrMsg: "deltaOfDelta value -2159999999 s overflows 32 bits", + }, + { + name: "milliseconds, short gap", + delta: time.Hour, + units: xtime.Millisecond, + }, + { + name: "milliseconds, almost too big gap", + delta: 24 * 24 * time.Hour, // slightly less than 2^31 ms + units: xtime.Millisecond, + }, + { + name: "milliseconds, too big gap", + delta: 25 * 24 * time.Hour, // more than 2^31 ms + units: xtime.Millisecond, + positiveErrMsg: "deltaOfDelta value 2160000000 ms overflows 32 bits", + negativeErrMsg: "deltaOfDelta value -2159999999 ms overflows 32 bits", + }, + { + name: "microseconds, short gap", + delta: time.Hour, + units: xtime.Microsecond, + }, + { + name: "microseconds, huge gap", + delta: 25 * 24 * time.Hour, + units: xtime.Microsecond, + }, + { + name: "nanoseconds, short gap", + delta: time.Hour, + units: xtime.Nanosecond, + }, + { + name: "nanoseconds, huge gap", + delta: 25 * 24 * time.Hour, + units: xtime.Nanosecond, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testPositiveDeltaOfDelta(t, tt.delta, tt.units, tt.positiveErrMsg) + testNegativeDeltaOfDelta(t, tt.delta, tt.units, tt.negativeErrMsg) + }) + } +} + +func testPositiveDeltaOfDelta(t *testing.T, delta time.Duration, units xtime.Unit, expectedErrMsg string) { + t.Helper() + + ctx := context.NewBackground() + defer ctx.Close() + + enc := getTestEncoder(testStartTime) + + dp1 := dp(testStartTime, 1) + dp2 := dp(testStartTime.Add(delta), 2) + + err := enc.Encode(dp1, units, nil) + require.NoError(t, err) + + err = enc.Encode(dp2, units, nil) + if expectedErrMsg != "" { + require.EqualError(t, err, expectedErrMsg) + return + } + require.NoError(t, err) + + dec := NewDecoder(enc.intOptimized, enc.opts) + stream, ok := enc.Stream(ctx) + require.True(t, ok) + + it := dec.Decode(stream) + defer it.Close() + + decodeAndCheck(t, it, dp1, units) + decodeAndCheck(t, it, dp2, units) + + require.False(t, it.Next()) + require.NoError(t, it.Err()) +} + +func testNegativeDeltaOfDelta(t *testing.T, delta time.Duration, units xtime.Unit, expectedErrMsg string) { + t.Helper() + + ctx := context.NewBackground() + defer ctx.Close() + + oneUnit, err := units.Value() + require.NoError(t, err) + + enc := getTestEncoder(testStartTime) + + dps := []ts.Datapoint{ + dp(testStartTime, 1), + dp(testStartTime.Add(delta/2), 2), + dp(testStartTime.Add(delta/2+delta), 3), + dp(testStartTime.Add(delta/2+delta+oneUnit), 4), // DoD = oneUnit - delta + } + + for i, dp := range dps { + err = enc.Encode(dp, units, nil) + if i == 3 && expectedErrMsg != "" { + require.EqualError(t, err, expectedErrMsg) + return + } + require.NoError(t, err) + } + + dec := NewDecoder(enc.intOptimized, enc.opts) + stream, ok := enc.Stream(ctx) + require.True(t, ok) + + it := dec.Decode(stream) + defer it.Close() + + for _, dp := range dps { + decodeAndCheck(t, it, dp, units) + } + + require.False(t, it.Next()) + require.NoError(t, it.Err()) +} + +func dp(timestamp time.Time, value float64) ts.Datapoint { + return ts.Datapoint{ + Timestamp: timestamp, + TimestampNanos: xtime.ToUnixNano(timestamp), + Value: value, + } +} + +func decodeAndCheck(t *testing.T, it encoding.ReaderIterator, dp ts.Datapoint, units xtime.Unit) { + t.Helper() + + require.True(t, it.Next()) + decodedDP, decodedUnits, _ := it.Current() + assert.Equal(t, dp, decodedDP) + assert.Equal(t, units, decodedUnits) +} diff --git a/src/dbnode/encoding/m3tsz/timestamp_encoder.go b/src/dbnode/encoding/m3tsz/timestamp_encoder.go index 939372b455..34228b06be 100644 --- a/src/dbnode/encoding/m3tsz/timestamp_encoder.go +++ b/src/dbnode/encoding/m3tsz/timestamp_encoder.go @@ -187,6 +187,16 @@ func (enc *TimestampEncoder) writeDeltaOfDeltaTimeUnitUnchanged( } deltaOfDelta := xtime.ToNormalizedDuration(curDelta-prevDelta, u) + if timeUnit == xtime.Millisecond || timeUnit == xtime.Second { + // Only milliseconds and seconds are encoded using + // up to 32 bits (see defaultTimeEncodingSchemes). + dod32 := int32(deltaOfDelta) + if int64(dod32) != deltaOfDelta { + return fmt.Errorf( + "deltaOfDelta value %d %s overflows 32 bits", deltaOfDelta, timeUnit) + } + } + tes, exists := enc.Options.TimeEncodingSchemes().SchemeForUnit(timeUnit) if !exists { return fmt.Errorf("time encoding scheme for time unit %v doesn't exist", timeUnit) diff --git a/src/dbnode/encoding/m3tsz/timestamp_iterator.go b/src/dbnode/encoding/m3tsz/timestamp_iterator.go index a9da57e72b..27f3716a20 100644 --- a/src/dbnode/encoding/m3tsz/timestamp_iterator.go +++ b/src/dbnode/encoding/m3tsz/timestamp_iterator.go @@ -141,7 +141,7 @@ func (it *TimestampIterator) readNextTimestamp(stream *encoding.IStream) error { } it.PrevTimeDelta += dod - it.PrevTime = it.PrevTime + xtime.UnixNano(it.PrevTimeDelta) + it.PrevTime += xtime.UnixNano(it.PrevTimeDelta) return nil } diff --git a/src/x/time/unit.go b/src/x/time/unit.go index 8d423e7f3c..3f78709fc9 100644 --- a/src/x/time/unit.go +++ b/src/x/time/unit.go @@ -56,7 +56,7 @@ func (tu Unit) Value() (time.Duration, error) { if tu < 1 || int(tu) >= unitCount { return 0, errUnrecognizedTimeUnit } - return time.Duration(unitsToDuration[tu]), nil + return unitsToDuration[tu], nil } // Count returns the number of units contained within the duration.